1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { CallCredentials } from './call-credentials';
|
19 | import {
|
20 | Call,
|
21 | CallStreamOptions,
|
22 | DeadlineInfoProvider,
|
23 | InterceptingListener,
|
24 | MessageContext,
|
25 | StatusObject,
|
26 | } from './call-interface';
|
27 | import { LogVerbosity, Propagate, Status } from './constants';
|
28 | import {
|
29 | Deadline,
|
30 | deadlineToString,
|
31 | formatDateDifference,
|
32 | getRelativeTimeout,
|
33 | minDeadline,
|
34 | } from './deadline';
|
35 | import { FilterStack, FilterStackFactory } from './filter-stack';
|
36 | import { InternalChannel } from './internal-channel';
|
37 | import { Metadata } from './metadata';
|
38 | import * as logging from './logging';
|
39 | import { restrictControlPlaneStatusCode } from './control-plane-status';
|
40 |
|
41 | const TRACER_NAME = 'resolving_call';
|
42 |
|
43 | export class ResolvingCall implements Call {
|
44 | private child: (Call & DeadlineInfoProvider) | null = null;
|
45 | private readPending = false;
|
46 | private pendingMessage: { context: MessageContext; message: Buffer } | null =
|
47 | null;
|
48 | private pendingHalfClose = false;
|
49 | private ended = false;
|
50 | private readFilterPending = false;
|
51 | private writeFilterPending = false;
|
52 | private pendingChildStatus: StatusObject | null = null;
|
53 | private metadata: Metadata | null = null;
|
54 | private listener: InterceptingListener | null = null;
|
55 | private deadline: Deadline;
|
56 | private host: string;
|
57 | private statusWatchers: ((status: StatusObject) => void)[] = [];
|
58 | private deadlineTimer: NodeJS.Timeout = setTimeout(() => {}, 0);
|
59 | private filterStack: FilterStack | null = null;
|
60 |
|
61 | private deadlineStartTime: Date | null = null;
|
62 | private configReceivedTime: Date | null = null;
|
63 | private childStartTime: Date | null = null;
|
64 |
|
65 | constructor(
|
66 | private readonly channel: InternalChannel,
|
67 | private readonly method: string,
|
68 | options: CallStreamOptions,
|
69 | private readonly filterStackFactory: FilterStackFactory,
|
70 | private credentials: CallCredentials,
|
71 | private callNumber: number
|
72 | ) {
|
73 | this.deadline = options.deadline;
|
74 | this.host = options.host;
|
75 | if (options.parentCall) {
|
76 | if (options.flags & Propagate.CANCELLATION) {
|
77 | options.parentCall.on('cancelled', () => {
|
78 | this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
|
79 | });
|
80 | }
|
81 | if (options.flags & Propagate.DEADLINE) {
|
82 | this.trace(
|
83 | 'Propagating deadline from parent: ' +
|
84 | options.parentCall.getDeadline()
|
85 | );
|
86 | this.deadline = minDeadline(
|
87 | this.deadline,
|
88 | options.parentCall.getDeadline()
|
89 | );
|
90 | }
|
91 | }
|
92 | this.trace('Created');
|
93 | this.runDeadlineTimer();
|
94 | }
|
95 |
|
96 | private trace(text: string): void {
|
97 | logging.trace(
|
98 | LogVerbosity.DEBUG,
|
99 | TRACER_NAME,
|
100 | '[' + this.callNumber + '] ' + text
|
101 | );
|
102 | }
|
103 |
|
104 | private runDeadlineTimer() {
|
105 | clearTimeout(this.deadlineTimer);
|
106 | this.deadlineStartTime = new Date();
|
107 | this.trace('Deadline: ' + deadlineToString(this.deadline));
|
108 | const timeout = getRelativeTimeout(this.deadline);
|
109 | if (timeout !== Infinity) {
|
110 | this.trace('Deadline will be reached in ' + timeout + 'ms');
|
111 | const handleDeadline = () => {
|
112 | if (!this.deadlineStartTime) {
|
113 | this.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
|
114 | return;
|
115 | }
|
116 | const deadlineInfo: string[] = [];
|
117 | const deadlineEndTime = new Date();
|
118 | deadlineInfo.push(`Deadline exceeded after ${formatDateDifference(this.deadlineStartTime, deadlineEndTime)}`);
|
119 | if (this.configReceivedTime) {
|
120 | if (this.configReceivedTime > this.deadlineStartTime) {
|
121 | deadlineInfo.push(`name resolution: ${formatDateDifference(this.deadlineStartTime, this.configReceivedTime)}`);
|
122 | }
|
123 | if (this.childStartTime) {
|
124 | if (this.childStartTime > this.configReceivedTime) {
|
125 | deadlineInfo.push(`metadata filters: ${formatDateDifference(this.configReceivedTime, this.childStartTime)}`);
|
126 | }
|
127 | } else {
|
128 | deadlineInfo.push('waiting for metadata filters');
|
129 | }
|
130 | } else {
|
131 | deadlineInfo.push('waiting for name resolution');
|
132 | }
|
133 | if (this.child) {
|
134 | deadlineInfo.push(...this.child.getDeadlineInfo());
|
135 | }
|
136 | this.cancelWithStatus(Status.DEADLINE_EXCEEDED, deadlineInfo.join(','));
|
137 | };
|
138 | if (timeout <= 0) {
|
139 | process.nextTick(handleDeadline);
|
140 | } else {
|
141 | this.deadlineTimer = setTimeout(handleDeadline, timeout);
|
142 | }
|
143 | }
|
144 | }
|
145 |
|
146 | private outputStatus(status: StatusObject) {
|
147 | if (!this.ended) {
|
148 | this.ended = true;
|
149 | if (!this.filterStack) {
|
150 | this.filterStack = this.filterStackFactory.createFilter();
|
151 | }
|
152 | clearTimeout(this.deadlineTimer);
|
153 | const filteredStatus = this.filterStack.receiveTrailers(status);
|
154 | this.trace(
|
155 | 'ended with status: code=' +
|
156 | filteredStatus.code +
|
157 | ' details="' +
|
158 | filteredStatus.details +
|
159 | '"'
|
160 | );
|
161 | this.statusWatchers.forEach(watcher => watcher(filteredStatus));
|
162 | process.nextTick(() => {
|
163 | this.listener?.onReceiveStatus(filteredStatus);
|
164 | });
|
165 | }
|
166 | }
|
167 |
|
168 | private sendMessageOnChild(context: MessageContext, message: Buffer): void {
|
169 | if (!this.child) {
|
170 | throw new Error('sendMessageonChild called with child not populated');
|
171 | }
|
172 | const child = this.child;
|
173 | this.writeFilterPending = true;
|
174 | this.filterStack!.sendMessage(
|
175 | Promise.resolve({ message: message, flags: context.flags })
|
176 | ).then(
|
177 | filteredMessage => {
|
178 | this.writeFilterPending = false;
|
179 | child.sendMessageWithContext(context, filteredMessage.message);
|
180 | if (this.pendingHalfClose) {
|
181 | child.halfClose();
|
182 | }
|
183 | },
|
184 | (status: StatusObject) => {
|
185 | this.cancelWithStatus(status.code, status.details);
|
186 | }
|
187 | );
|
188 | }
|
189 |
|
190 | getConfig(): void {
|
191 | if (this.ended) {
|
192 | return;
|
193 | }
|
194 | if (!this.metadata || !this.listener) {
|
195 | throw new Error('getConfig called before start');
|
196 | }
|
197 | const configResult = this.channel.getConfig(this.method, this.metadata);
|
198 | if (configResult.type === 'NONE') {
|
199 | this.channel.queueCallForConfig(this);
|
200 | return;
|
201 | } else if (configResult.type === 'ERROR') {
|
202 | if (this.metadata.getOptions().waitForReady) {
|
203 | this.channel.queueCallForConfig(this);
|
204 | } else {
|
205 | this.outputStatus(configResult.error);
|
206 | }
|
207 | return;
|
208 | }
|
209 |
|
210 | this.configReceivedTime = new Date();
|
211 | const config = configResult.config;
|
212 | if (config.status !== Status.OK) {
|
213 | const { code, details } = restrictControlPlaneStatusCode(
|
214 | config.status,
|
215 | 'Failed to route call to method ' + this.method
|
216 | );
|
217 | this.outputStatus({
|
218 | code: code,
|
219 | details: details,
|
220 | metadata: new Metadata(),
|
221 | });
|
222 | return;
|
223 | }
|
224 |
|
225 | if (config.methodConfig.timeout) {
|
226 | const configDeadline = new Date();
|
227 | configDeadline.setSeconds(
|
228 | configDeadline.getSeconds() + config.methodConfig.timeout.seconds
|
229 | );
|
230 | configDeadline.setMilliseconds(
|
231 | configDeadline.getMilliseconds() +
|
232 | config.methodConfig.timeout.nanos / 1_000_000
|
233 | );
|
234 | this.deadline = minDeadline(this.deadline, configDeadline);
|
235 | this.runDeadlineTimer();
|
236 | }
|
237 |
|
238 | this.filterStackFactory.push(config.dynamicFilterFactories);
|
239 | this.filterStack = this.filterStackFactory.createFilter();
|
240 | this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(
|
241 | filteredMetadata => {
|
242 | this.child = this.channel.createInnerCall(
|
243 | config,
|
244 | this.method,
|
245 | this.host,
|
246 | this.credentials,
|
247 | this.deadline
|
248 | );
|
249 | this.trace('Created child [' + this.child.getCallNumber() + ']');
|
250 | this.childStartTime = new Date();
|
251 | this.child.start(filteredMetadata, {
|
252 | onReceiveMetadata: metadata => {
|
253 | this.trace('Received metadata');
|
254 | this.listener!.onReceiveMetadata(
|
255 | this.filterStack!.receiveMetadata(metadata)
|
256 | );
|
257 | },
|
258 | onReceiveMessage: message => {
|
259 | this.trace('Received message');
|
260 | this.readFilterPending = true;
|
261 | this.filterStack!.receiveMessage(message).then(
|
262 | filteredMesssage => {
|
263 | this.trace('Finished filtering received message');
|
264 | this.readFilterPending = false;
|
265 | this.listener!.onReceiveMessage(filteredMesssage);
|
266 | if (this.pendingChildStatus) {
|
267 | this.outputStatus(this.pendingChildStatus);
|
268 | }
|
269 | },
|
270 | (status: StatusObject) => {
|
271 | this.cancelWithStatus(status.code, status.details);
|
272 | }
|
273 | );
|
274 | },
|
275 | onReceiveStatus: status => {
|
276 | this.trace('Received status');
|
277 | if (this.readFilterPending) {
|
278 | this.pendingChildStatus = status;
|
279 | } else {
|
280 | this.outputStatus(status);
|
281 | }
|
282 | },
|
283 | });
|
284 | if (this.readPending) {
|
285 | this.child.startRead();
|
286 | }
|
287 | if (this.pendingMessage) {
|
288 | this.sendMessageOnChild(
|
289 | this.pendingMessage.context,
|
290 | this.pendingMessage.message
|
291 | );
|
292 | } else if (this.pendingHalfClose) {
|
293 | this.child.halfClose();
|
294 | }
|
295 | },
|
296 | (status: StatusObject) => {
|
297 | this.outputStatus(status);
|
298 | }
|
299 | );
|
300 | }
|
301 |
|
302 | reportResolverError(status: StatusObject) {
|
303 | if (this.metadata?.getOptions().waitForReady) {
|
304 | this.channel.queueCallForConfig(this);
|
305 | } else {
|
306 | this.outputStatus(status);
|
307 | }
|
308 | }
|
309 | cancelWithStatus(status: Status, details: string): void {
|
310 | this.trace(
|
311 | 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
|
312 | );
|
313 | this.child?.cancelWithStatus(status, details);
|
314 | this.outputStatus({
|
315 | code: status,
|
316 | details: details,
|
317 | metadata: new Metadata(),
|
318 | });
|
319 | }
|
320 | getPeer(): string {
|
321 | return this.child?.getPeer() ?? this.channel.getTarget();
|
322 | }
|
323 | start(metadata: Metadata, listener: InterceptingListener): void {
|
324 | this.trace('start called');
|
325 | this.metadata = metadata.clone();
|
326 | this.listener = listener;
|
327 | this.getConfig();
|
328 | }
|
329 | sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
330 | this.trace('write() called with message of length ' + message.length);
|
331 | if (this.child) {
|
332 | this.sendMessageOnChild(context, message);
|
333 | } else {
|
334 | this.pendingMessage = { context, message };
|
335 | }
|
336 | }
|
337 | startRead(): void {
|
338 | this.trace('startRead called');
|
339 | if (this.child) {
|
340 | this.child.startRead();
|
341 | } else {
|
342 | this.readPending = true;
|
343 | }
|
344 | }
|
345 | halfClose(): void {
|
346 | this.trace('halfClose called');
|
347 | if (this.child && !this.writeFilterPending) {
|
348 | this.child.halfClose();
|
349 | } else {
|
350 | this.pendingHalfClose = true;
|
351 | }
|
352 | }
|
353 | setCredentials(credentials: CallCredentials): void {
|
354 | this.credentials = this.credentials.compose(credentials);
|
355 | }
|
356 |
|
357 | addStatusWatcher(watcher: (status: StatusObject) => void) {
|
358 | this.statusWatchers.push(watcher);
|
359 | }
|
360 |
|
361 | getCallNumber(): number {
|
362 | return this.callNumber;
|
363 | }
|
364 | }
|