UNPKG

11.8 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { CallCredentials } from './call-credentials';
19import {
20 Call,
21 CallStreamOptions,
22 DeadlineInfoProvider,
23 InterceptingListener,
24 MessageContext,
25 StatusObject,
26} from './call-interface';
27import { LogVerbosity, Propagate, Status } from './constants';
28import {
29 Deadline,
30 deadlineToString,
31 formatDateDifference,
32 getRelativeTimeout,
33 minDeadline,
34} from './deadline';
35import { FilterStack, FilterStackFactory } from './filter-stack';
36import { InternalChannel } from './internal-channel';
37import { Metadata } from './metadata';
38import * as logging from './logging';
39import { restrictControlPlaneStatusCode } from './control-plane-status';
40
41const TRACER_NAME = 'resolving_call';
42
43export class ResolvingCall implements Call {
44 private child: (Call & DeadlineInfoProvider) | null = null;
45 private readPending = false;
46 private pendingMessage: { context: MessageContext; message: Buffer } | null =
47 null;
48 private pendingHalfClose = false;
49 private ended = false;
50 private readFilterPending = false;
51 private writeFilterPending = false;
52 private pendingChildStatus: StatusObject | null = null;
53 private metadata: Metadata | null = null;
54 private listener: InterceptingListener | null = null;
55 private deadline: Deadline;
56 private host: string;
57 private statusWatchers: ((status: StatusObject) => void)[] = [];
58 private deadlineTimer: NodeJS.Timeout = setTimeout(() => {}, 0);
59 private filterStack: FilterStack | null = null;
60
61 private deadlineStartTime: Date | null = null;
62 private configReceivedTime: Date | null = null;
63 private childStartTime: Date | null = null;
64
65 constructor(
66 private readonly channel: InternalChannel,
67 private readonly method: string,
68 options: CallStreamOptions,
69 private readonly filterStackFactory: FilterStackFactory,
70 private credentials: CallCredentials,
71 private callNumber: number
72 ) {
73 this.deadline = options.deadline;
74 this.host = options.host;
75 if (options.parentCall) {
76 if (options.flags & Propagate.CANCELLATION) {
77 options.parentCall.on('cancelled', () => {
78 this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
79 });
80 }
81 if (options.flags & Propagate.DEADLINE) {
82 this.trace(
83 'Propagating deadline from parent: ' +
84 options.parentCall.getDeadline()
85 );
86 this.deadline = minDeadline(
87 this.deadline,
88 options.parentCall.getDeadline()
89 );
90 }
91 }
92 this.trace('Created');
93 this.runDeadlineTimer();
94 }
95
96 private trace(text: string): void {
97 logging.trace(
98 LogVerbosity.DEBUG,
99 TRACER_NAME,
100 '[' + this.callNumber + '] ' + text
101 );
102 }
103
104 private runDeadlineTimer() {
105 clearTimeout(this.deadlineTimer);
106 this.deadlineStartTime = new Date();
107 this.trace('Deadline: ' + deadlineToString(this.deadline));
108 const timeout = getRelativeTimeout(this.deadline);
109 if (timeout !== Infinity) {
110 this.trace('Deadline will be reached in ' + timeout + 'ms');
111 const handleDeadline = () => {
112 if (!this.deadlineStartTime) {
113 this.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
114 return;
115 }
116 const deadlineInfo: string[] = [];
117 const deadlineEndTime = new Date();
118 deadlineInfo.push(`Deadline exceeded after ${formatDateDifference(this.deadlineStartTime, deadlineEndTime)}`);
119 if (this.configReceivedTime) {
120 if (this.configReceivedTime > this.deadlineStartTime) {
121 deadlineInfo.push(`name resolution: ${formatDateDifference(this.deadlineStartTime, this.configReceivedTime)}`);
122 }
123 if (this.childStartTime) {
124 if (this.childStartTime > this.configReceivedTime) {
125 deadlineInfo.push(`metadata filters: ${formatDateDifference(this.configReceivedTime, this.childStartTime)}`);
126 }
127 } else {
128 deadlineInfo.push('waiting for metadata filters');
129 }
130 } else {
131 deadlineInfo.push('waiting for name resolution');
132 }
133 if (this.child) {
134 deadlineInfo.push(...this.child.getDeadlineInfo());
135 }
136 this.cancelWithStatus(Status.DEADLINE_EXCEEDED, deadlineInfo.join(','));
137 };
138 if (timeout <= 0) {
139 process.nextTick(handleDeadline);
140 } else {
141 this.deadlineTimer = setTimeout(handleDeadline, timeout);
142 }
143 }
144 }
145
146 private outputStatus(status: StatusObject) {
147 if (!this.ended) {
148 this.ended = true;
149 if (!this.filterStack) {
150 this.filterStack = this.filterStackFactory.createFilter();
151 }
152 clearTimeout(this.deadlineTimer);
153 const filteredStatus = this.filterStack.receiveTrailers(status);
154 this.trace(
155 'ended with status: code=' +
156 filteredStatus.code +
157 ' details="' +
158 filteredStatus.details +
159 '"'
160 );
161 this.statusWatchers.forEach(watcher => watcher(filteredStatus));
162 process.nextTick(() => {
163 this.listener?.onReceiveStatus(filteredStatus);
164 });
165 }
166 }
167
168 private sendMessageOnChild(context: MessageContext, message: Buffer): void {
169 if (!this.child) {
170 throw new Error('sendMessageonChild called with child not populated');
171 }
172 const child = this.child;
173 this.writeFilterPending = true;
174 this.filterStack!.sendMessage(
175 Promise.resolve({ message: message, flags: context.flags })
176 ).then(
177 filteredMessage => {
178 this.writeFilterPending = false;
179 child.sendMessageWithContext(context, filteredMessage.message);
180 if (this.pendingHalfClose) {
181 child.halfClose();
182 }
183 },
184 (status: StatusObject) => {
185 this.cancelWithStatus(status.code, status.details);
186 }
187 );
188 }
189
190 getConfig(): void {
191 if (this.ended) {
192 return;
193 }
194 if (!this.metadata || !this.listener) {
195 throw new Error('getConfig called before start');
196 }
197 const configResult = this.channel.getConfig(this.method, this.metadata);
198 if (configResult.type === 'NONE') {
199 this.channel.queueCallForConfig(this);
200 return;
201 } else if (configResult.type === 'ERROR') {
202 if (this.metadata.getOptions().waitForReady) {
203 this.channel.queueCallForConfig(this);
204 } else {
205 this.outputStatus(configResult.error);
206 }
207 return;
208 }
209 // configResult.type === 'SUCCESS'
210 this.configReceivedTime = new Date();
211 const config = configResult.config;
212 if (config.status !== Status.OK) {
213 const { code, details } = restrictControlPlaneStatusCode(
214 config.status,
215 'Failed to route call to method ' + this.method
216 );
217 this.outputStatus({
218 code: code,
219 details: details,
220 metadata: new Metadata(),
221 });
222 return;
223 }
224
225 if (config.methodConfig.timeout) {
226 const configDeadline = new Date();
227 configDeadline.setSeconds(
228 configDeadline.getSeconds() + config.methodConfig.timeout.seconds
229 );
230 configDeadline.setMilliseconds(
231 configDeadline.getMilliseconds() +
232 config.methodConfig.timeout.nanos / 1_000_000
233 );
234 this.deadline = minDeadline(this.deadline, configDeadline);
235 this.runDeadlineTimer();
236 }
237
238 this.filterStackFactory.push(config.dynamicFilterFactories);
239 this.filterStack = this.filterStackFactory.createFilter();
240 this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(
241 filteredMetadata => {
242 this.child = this.channel.createInnerCall(
243 config,
244 this.method,
245 this.host,
246 this.credentials,
247 this.deadline
248 );
249 this.trace('Created child [' + this.child.getCallNumber() + ']');
250 this.childStartTime = new Date();
251 this.child.start(filteredMetadata, {
252 onReceiveMetadata: metadata => {
253 this.trace('Received metadata');
254 this.listener!.onReceiveMetadata(
255 this.filterStack!.receiveMetadata(metadata)
256 );
257 },
258 onReceiveMessage: message => {
259 this.trace('Received message');
260 this.readFilterPending = true;
261 this.filterStack!.receiveMessage(message).then(
262 filteredMesssage => {
263 this.trace('Finished filtering received message');
264 this.readFilterPending = false;
265 this.listener!.onReceiveMessage(filteredMesssage);
266 if (this.pendingChildStatus) {
267 this.outputStatus(this.pendingChildStatus);
268 }
269 },
270 (status: StatusObject) => {
271 this.cancelWithStatus(status.code, status.details);
272 }
273 );
274 },
275 onReceiveStatus: status => {
276 this.trace('Received status');
277 if (this.readFilterPending) {
278 this.pendingChildStatus = status;
279 } else {
280 this.outputStatus(status);
281 }
282 },
283 });
284 if (this.readPending) {
285 this.child.startRead();
286 }
287 if (this.pendingMessage) {
288 this.sendMessageOnChild(
289 this.pendingMessage.context,
290 this.pendingMessage.message
291 );
292 } else if (this.pendingHalfClose) {
293 this.child.halfClose();
294 }
295 },
296 (status: StatusObject) => {
297 this.outputStatus(status);
298 }
299 );
300 }
301
302 reportResolverError(status: StatusObject) {
303 if (this.metadata?.getOptions().waitForReady) {
304 this.channel.queueCallForConfig(this);
305 } else {
306 this.outputStatus(status);
307 }
308 }
309 cancelWithStatus(status: Status, details: string): void {
310 this.trace(
311 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
312 );
313 this.child?.cancelWithStatus(status, details);
314 this.outputStatus({
315 code: status,
316 details: details,
317 metadata: new Metadata(),
318 });
319 }
320 getPeer(): string {
321 return this.child?.getPeer() ?? this.channel.getTarget();
322 }
323 start(metadata: Metadata, listener: InterceptingListener): void {
324 this.trace('start called');
325 this.metadata = metadata.clone();
326 this.listener = listener;
327 this.getConfig();
328 }
329 sendMessageWithContext(context: MessageContext, message: Buffer): void {
330 this.trace('write() called with message of length ' + message.length);
331 if (this.child) {
332 this.sendMessageOnChild(context, message);
333 } else {
334 this.pendingMessage = { context, message };
335 }
336 }
337 startRead(): void {
338 this.trace('startRead called');
339 if (this.child) {
340 this.child.startRead();
341 } else {
342 this.readPending = true;
343 }
344 }
345 halfClose(): void {
346 this.trace('halfClose called');
347 if (this.child && !this.writeFilterPending) {
348 this.child.halfClose();
349 } else {
350 this.pendingHalfClose = true;
351 }
352 }
353 setCredentials(credentials: CallCredentials): void {
354 this.credentials = this.credentials.compose(credentials);
355 }
356
357 addStatusWatcher(watcher: (status: StatusObject) => void) {
358 this.statusWatchers.push(watcher);
359 }
360
361 getCallNumber(): number {
362 return this.callNumber;
363 }
364}