UNPKG

9.86 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { CallCredentials } from "./call-credentials";
19import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
20import { LogVerbosity, Propagate, Status } from "./constants";
21import { Deadline, deadlineToString, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline";
22import { FilterStack, FilterStackFactory } from "./filter-stack";
23import { InternalChannel } from "./internal-channel";
24import { Metadata } from "./metadata";
25import * as logging from './logging';
26import { restrictControlPlaneStatusCode } from "./control-plane-status";
27
28const TRACER_NAME = 'resolving_call';
29
30export class ResolvingCall implements Call {
31 private child: Call | null = null;
32 private readPending = false;
33 private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
34 private pendingHalfClose = false;
35 private ended = false;
36 private readFilterPending = false;
37 private writeFilterPending = false;
38 private pendingChildStatus: StatusObject | null = null;
39 private metadata: Metadata | null = null;
40 private listener: InterceptingListener | null = null;
41 private deadline: Deadline;
42 private host: string;
43 private statusWatchers: ((status: StatusObject) => void)[] = [];
44 private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
45 private filterStack: FilterStack | null = null;
46
47 constructor(
48 private readonly channel: InternalChannel,
49 private readonly method: string,
50 options: CallStreamOptions,
51 private readonly filterStackFactory: FilterStackFactory,
52 private credentials: CallCredentials,
53 private callNumber: number
54 ) {
55 this.deadline = options.deadline;
56 this.host = options.host;
57 if (options.parentCall) {
58 if (options.flags & Propagate.CANCELLATION) {
59 options.parentCall.on('cancelled', () => {
60 this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
61 });
62 }
63 if (options.flags & Propagate.DEADLINE) {
64 this.trace('Propagating deadline from parent: ' + options.parentCall.getDeadline());
65 this.deadline = minDeadline(this.deadline, options.parentCall.getDeadline());
66 }
67 }
68 this.trace('Created');
69 this.runDeadlineTimer();
70 }
71
72 private trace(text: string): void {
73 logging.trace(
74 LogVerbosity.DEBUG,
75 TRACER_NAME,
76 '[' + this.callNumber + '] ' + text
77 );
78 }
79
80 private runDeadlineTimer() {
81 clearTimeout(this.deadlineTimer);
82 this.trace('Deadline: ' + deadlineToString(this.deadline));
83 const timeout = getRelativeTimeout(this.deadline);
84 if (timeout !== Infinity) {
85 this.trace('Deadline will be reached in ' + timeout + 'ms');
86 const handleDeadline = () => {
87 this.cancelWithStatus(
88 Status.DEADLINE_EXCEEDED,
89 'Deadline exceeded'
90 );
91 }
92 if (timeout <= 0) {
93 process.nextTick(handleDeadline);
94 } else {
95 this.deadlineTimer = setTimeout(handleDeadline, timeout);
96 }
97 }
98 }
99
100 private outputStatus(status: StatusObject) {
101 if (!this.ended) {
102 this.ended = true;
103 if (!this.filterStack) {
104 this.filterStack = this.filterStackFactory.createFilter();
105 }
106 clearTimeout(this.deadlineTimer);
107 const filteredStatus = this.filterStack.receiveTrailers(status);
108 this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"');
109 this.statusWatchers.forEach(watcher => watcher(filteredStatus));
110 process.nextTick(() => {
111 this.listener?.onReceiveStatus(filteredStatus);
112 });
113 }
114 }
115
116 private sendMessageOnChild(context: MessageContext, message: Buffer): void {
117 if (!this.child) {
118 throw new Error('sendMessageonChild called with child not populated');
119 }
120 const child = this.child;
121 this.writeFilterPending = true;
122 this.filterStack!.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
123 this.writeFilterPending = false;
124 child.sendMessageWithContext(context, filteredMessage.message);
125 if (this.pendingHalfClose) {
126 child.halfClose();
127 }
128 }, (status: StatusObject) => {
129 this.cancelWithStatus(status.code, status.details);
130 });
131 }
132
133 getConfig(): void {
134 if (this.ended) {
135 return;
136 }
137 if (!this.metadata || !this.listener) {
138 throw new Error('getConfig called before start');
139 }
140 const configResult = this.channel.getConfig(this.method, this.metadata);
141 if (configResult.type === 'NONE') {
142 this.channel.queueCallForConfig(this);
143 return;
144 } else if (configResult.type === 'ERROR') {
145 if (this.metadata.getOptions().waitForReady) {
146 this.channel.queueCallForConfig(this);
147 } else {
148 this.outputStatus(configResult.error);
149 }
150 return;
151 }
152 // configResult.type === 'SUCCESS'
153 const config = configResult.config;
154 if (config.status !== Status.OK) {
155 const {code, details} = restrictControlPlaneStatusCode(config.status, 'Failed to route call to method ' + this.method);
156 this.outputStatus({
157 code: code,
158 details: details,
159 metadata: new Metadata()
160 });
161 return;
162 }
163
164 if (config.methodConfig.timeout) {
165 const configDeadline = new Date();
166 configDeadline.setSeconds(
167 configDeadline.getSeconds() + config.methodConfig.timeout.seconds
168 );
169 configDeadline.setMilliseconds(
170 configDeadline.getMilliseconds() +
171 config.methodConfig.timeout.nanos / 1_000_000
172 );
173 this.deadline = minDeadline(this.deadline, configDeadline);
174 this.runDeadlineTimer();
175 }
176
177 this.filterStackFactory.push(config.dynamicFilterFactories);
178 this.filterStack = this.filterStackFactory.createFilter();
179 this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => {
180 this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
181 this.trace('Created child [' + this.child.getCallNumber() + ']')
182 this.child.start(filteredMetadata, {
183 onReceiveMetadata: metadata => {
184 this.trace('Received metadata')
185 this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata));
186 },
187 onReceiveMessage: message => {
188 this.trace('Received message');
189 this.readFilterPending = true;
190 this.filterStack!.receiveMessage(message).then(filteredMesssage => {
191 this.trace('Finished filtering received message');
192 this.readFilterPending = false;
193 this.listener!.onReceiveMessage(filteredMesssage);
194 if (this.pendingChildStatus) {
195 this.outputStatus(this.pendingChildStatus);
196 }
197 }, (status: StatusObject) => {
198 this.cancelWithStatus(status.code, status.details);
199 });
200 },
201 onReceiveStatus: status => {
202 this.trace('Received status');
203 if (this.readFilterPending) {
204 this.pendingChildStatus = status;
205 } else {
206 this.outputStatus(status);
207 }
208 }
209 });
210 if (this.readPending) {
211 this.child.startRead();
212 }
213 if (this.pendingMessage) {
214 this.sendMessageOnChild(this.pendingMessage.context, this.pendingMessage.message);
215 } else if (this.pendingHalfClose) {
216 this.child.halfClose();
217 }
218 }, (status: StatusObject) => {
219 this.outputStatus(status);
220 })
221 }
222
223 reportResolverError(status: StatusObject) {
224 if (this.metadata?.getOptions().waitForReady) {
225 this.channel.queueCallForConfig(this);
226 } else {
227 this.outputStatus(status);
228 }
229 }
230 cancelWithStatus(status: Status, details: string): void {
231 this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
232 this.child?.cancelWithStatus(status, details);
233 this.outputStatus({code: status, details: details, metadata: new Metadata()});
234 }
235 getPeer(): string {
236 return this.child?.getPeer() ?? this.channel.getTarget();
237 }
238 start(metadata: Metadata, listener: InterceptingListener): void {
239 this.trace('start called');
240 this.metadata = metadata.clone();
241 this.listener = listener;
242 this.getConfig();
243 }
244 sendMessageWithContext(context: MessageContext, message: Buffer): void {
245 this.trace('write() called with message of length ' + message.length);
246 if (this.child) {
247 this.sendMessageOnChild(context, message);
248 } else {
249 this.pendingMessage = {context, message};
250 }
251 }
252 startRead(): void {
253 this.trace('startRead called');
254 if (this.child) {
255 this.child.startRead();
256 } else {
257 this.readPending = true;
258 }
259 }
260 halfClose(): void {
261 this.trace('halfClose called');
262 if (this.child && !this.writeFilterPending) {
263 this.child.halfClose();
264 } else {
265 this.pendingHalfClose = true;
266 }
267 }
268 setCredentials(credentials: CallCredentials): void {
269 this.credentials = this.credentials.compose(credentials);
270 }
271
272 addStatusWatcher(watcher: (status: StatusObject) => void) {
273 this.statusWatchers.push(watcher);
274 }
275
276 getCallNumber(): number {
277 return this.callNumber;
278 }
279}
\No newline at end of file