UNPKG

10.3 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { CallCredentials } from './call-credentials';
19import {
20 Call,
21 CallStreamOptions,
22 InterceptingListener,
23 MessageContext,
24 StatusObject,
25} from './call-interface';
26import { LogVerbosity, Propagate, Status } from './constants';
27import {
28 Deadline,
29 deadlineToString,
30 getRelativeTimeout,
31 minDeadline,
32} from './deadline';
33import { FilterStack, FilterStackFactory } from './filter-stack';
34import { InternalChannel } from './internal-channel';
35import { Metadata } from './metadata';
36import * as logging from './logging';
37import { restrictControlPlaneStatusCode } from './control-plane-status';
38
39const TRACER_NAME = 'resolving_call';
40
41export class ResolvingCall implements Call {
42 private child: Call | null = null;
43 private readPending = false;
44 private pendingMessage: { context: MessageContext; message: Buffer } | null =
45 null;
46 private pendingHalfClose = false;
47 private ended = false;
48 private readFilterPending = false;
49 private writeFilterPending = false;
50 private pendingChildStatus: StatusObject | null = null;
51 private metadata: Metadata | null = null;
52 private listener: InterceptingListener | null = null;
53 private deadline: Deadline;
54 private host: string;
55 private statusWatchers: ((status: StatusObject) => void)[] = [];
56 private deadlineTimer: NodeJS.Timeout = setTimeout(() => {}, 0);
57 private filterStack: FilterStack | null = null;
58
59 constructor(
60 private readonly channel: InternalChannel,
61 private readonly method: string,
62 options: CallStreamOptions,
63 private readonly filterStackFactory: FilterStackFactory,
64 private credentials: CallCredentials,
65 private callNumber: number
66 ) {
67 this.deadline = options.deadline;
68 this.host = options.host;
69 if (options.parentCall) {
70 if (options.flags & Propagate.CANCELLATION) {
71 options.parentCall.on('cancelled', () => {
72 this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
73 });
74 }
75 if (options.flags & Propagate.DEADLINE) {
76 this.trace(
77 'Propagating deadline from parent: ' +
78 options.parentCall.getDeadline()
79 );
80 this.deadline = minDeadline(
81 this.deadline,
82 options.parentCall.getDeadline()
83 );
84 }
85 }
86 this.trace('Created');
87 this.runDeadlineTimer();
88 }
89
90 private trace(text: string): void {
91 logging.trace(
92 LogVerbosity.DEBUG,
93 TRACER_NAME,
94 '[' + this.callNumber + '] ' + text
95 );
96 }
97
98 private runDeadlineTimer() {
99 clearTimeout(this.deadlineTimer);
100 this.trace('Deadline: ' + deadlineToString(this.deadline));
101 const timeout = getRelativeTimeout(this.deadline);
102 if (timeout !== Infinity) {
103 this.trace('Deadline will be reached in ' + timeout + 'ms');
104 const handleDeadline = () => {
105 this.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
106 };
107 if (timeout <= 0) {
108 process.nextTick(handleDeadline);
109 } else {
110 this.deadlineTimer = setTimeout(handleDeadline, timeout);
111 }
112 }
113 }
114
115 private outputStatus(status: StatusObject) {
116 if (!this.ended) {
117 this.ended = true;
118 if (!this.filterStack) {
119 this.filterStack = this.filterStackFactory.createFilter();
120 }
121 clearTimeout(this.deadlineTimer);
122 const filteredStatus = this.filterStack.receiveTrailers(status);
123 this.trace(
124 'ended with status: code=' +
125 filteredStatus.code +
126 ' details="' +
127 filteredStatus.details +
128 '"'
129 );
130 this.statusWatchers.forEach(watcher => watcher(filteredStatus));
131 process.nextTick(() => {
132 this.listener?.onReceiveStatus(filteredStatus);
133 });
134 }
135 }
136
137 private sendMessageOnChild(context: MessageContext, message: Buffer): void {
138 if (!this.child) {
139 throw new Error('sendMessageonChild called with child not populated');
140 }
141 const child = this.child;
142 this.writeFilterPending = true;
143 this.filterStack!.sendMessage(
144 Promise.resolve({ message: message, flags: context.flags })
145 ).then(
146 filteredMessage => {
147 this.writeFilterPending = false;
148 child.sendMessageWithContext(context, filteredMessage.message);
149 if (this.pendingHalfClose) {
150 child.halfClose();
151 }
152 },
153 (status: StatusObject) => {
154 this.cancelWithStatus(status.code, status.details);
155 }
156 );
157 }
158
159 getConfig(): void {
160 if (this.ended) {
161 return;
162 }
163 if (!this.metadata || !this.listener) {
164 throw new Error('getConfig called before start');
165 }
166 const configResult = this.channel.getConfig(this.method, this.metadata);
167 if (configResult.type === 'NONE') {
168 this.channel.queueCallForConfig(this);
169 return;
170 } else if (configResult.type === 'ERROR') {
171 if (this.metadata.getOptions().waitForReady) {
172 this.channel.queueCallForConfig(this);
173 } else {
174 this.outputStatus(configResult.error);
175 }
176 return;
177 }
178 // configResult.type === 'SUCCESS'
179 const config = configResult.config;
180 if (config.status !== Status.OK) {
181 const { code, details } = restrictControlPlaneStatusCode(
182 config.status,
183 'Failed to route call to method ' + this.method
184 );
185 this.outputStatus({
186 code: code,
187 details: details,
188 metadata: new Metadata(),
189 });
190 return;
191 }
192
193 if (config.methodConfig.timeout) {
194 const configDeadline = new Date();
195 configDeadline.setSeconds(
196 configDeadline.getSeconds() + config.methodConfig.timeout.seconds
197 );
198 configDeadline.setMilliseconds(
199 configDeadline.getMilliseconds() +
200 config.methodConfig.timeout.nanos / 1_000_000
201 );
202 this.deadline = minDeadline(this.deadline, configDeadline);
203 this.runDeadlineTimer();
204 }
205
206 this.filterStackFactory.push(config.dynamicFilterFactories);
207 this.filterStack = this.filterStackFactory.createFilter();
208 this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(
209 filteredMetadata => {
210 this.child = this.channel.createInnerCall(
211 config,
212 this.method,
213 this.host,
214 this.credentials,
215 this.deadline
216 );
217 this.trace('Created child [' + this.child.getCallNumber() + ']');
218 this.child.start(filteredMetadata, {
219 onReceiveMetadata: metadata => {
220 this.trace('Received metadata');
221 this.listener!.onReceiveMetadata(
222 this.filterStack!.receiveMetadata(metadata)
223 );
224 },
225 onReceiveMessage: message => {
226 this.trace('Received message');
227 this.readFilterPending = true;
228 this.filterStack!.receiveMessage(message).then(
229 filteredMesssage => {
230 this.trace('Finished filtering received message');
231 this.readFilterPending = false;
232 this.listener!.onReceiveMessage(filteredMesssage);
233 if (this.pendingChildStatus) {
234 this.outputStatus(this.pendingChildStatus);
235 }
236 },
237 (status: StatusObject) => {
238 this.cancelWithStatus(status.code, status.details);
239 }
240 );
241 },
242 onReceiveStatus: status => {
243 this.trace('Received status');
244 if (this.readFilterPending) {
245 this.pendingChildStatus = status;
246 } else {
247 this.outputStatus(status);
248 }
249 },
250 });
251 if (this.readPending) {
252 this.child.startRead();
253 }
254 if (this.pendingMessage) {
255 this.sendMessageOnChild(
256 this.pendingMessage.context,
257 this.pendingMessage.message
258 );
259 } else if (this.pendingHalfClose) {
260 this.child.halfClose();
261 }
262 },
263 (status: StatusObject) => {
264 this.outputStatus(status);
265 }
266 );
267 }
268
269 reportResolverError(status: StatusObject) {
270 if (this.metadata?.getOptions().waitForReady) {
271 this.channel.queueCallForConfig(this);
272 } else {
273 this.outputStatus(status);
274 }
275 }
276 cancelWithStatus(status: Status, details: string): void {
277 this.trace(
278 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
279 );
280 this.child?.cancelWithStatus(status, details);
281 this.outputStatus({
282 code: status,
283 details: details,
284 metadata: new Metadata(),
285 });
286 }
287 getPeer(): string {
288 return this.child?.getPeer() ?? this.channel.getTarget();
289 }
290 start(metadata: Metadata, listener: InterceptingListener): void {
291 this.trace('start called');
292 this.metadata = metadata.clone();
293 this.listener = listener;
294 this.getConfig();
295 }
296 sendMessageWithContext(context: MessageContext, message: Buffer): void {
297 this.trace('write() called with message of length ' + message.length);
298 if (this.child) {
299 this.sendMessageOnChild(context, message);
300 } else {
301 this.pendingMessage = { context, message };
302 }
303 }
304 startRead(): void {
305 this.trace('startRead called');
306 if (this.child) {
307 this.child.startRead();
308 } else {
309 this.readPending = true;
310 }
311 }
312 halfClose(): void {
313 this.trace('halfClose called');
314 if (this.child && !this.writeFilterPending) {
315 this.child.halfClose();
316 } else {
317 this.pendingHalfClose = true;
318 }
319 }
320 setCredentials(credentials: CallCredentials): void {
321 this.credentials = this.credentials.compose(credentials);
322 }
323
324 addStatusWatcher(watcher: (status: StatusObject) => void) {
325 this.statusWatchers.push(watcher);
326 }
327
328 getCallNumber(): number {
329 return this.callNumber;
330 }
331}