1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { CallCredentials } from './call-credentials';
|
19 | import {
|
20 | Call,
|
21 | CallStreamOptions,
|
22 | InterceptingListener,
|
23 | MessageContext,
|
24 | StatusObject,
|
25 | } from './call-interface';
|
26 | import { LogVerbosity, Propagate, Status } from './constants';
|
27 | import {
|
28 | Deadline,
|
29 | deadlineToString,
|
30 | getRelativeTimeout,
|
31 | minDeadline,
|
32 | } from './deadline';
|
33 | import { FilterStack, FilterStackFactory } from './filter-stack';
|
34 | import { InternalChannel } from './internal-channel';
|
35 | import { Metadata } from './metadata';
|
36 | import * as logging from './logging';
|
37 | import { restrictControlPlaneStatusCode } from './control-plane-status';
|
38 |
|
39 | const TRACER_NAME = 'resolving_call';
|
40 |
|
41 | export class ResolvingCall implements Call {
|
42 | private child: Call | null = null;
|
43 | private readPending = false;
|
44 | private pendingMessage: { context: MessageContext; message: Buffer } | null =
|
45 | null;
|
46 | private pendingHalfClose = false;
|
47 | private ended = false;
|
48 | private readFilterPending = false;
|
49 | private writeFilterPending = false;
|
50 | private pendingChildStatus: StatusObject | null = null;
|
51 | private metadata: Metadata | null = null;
|
52 | private listener: InterceptingListener | null = null;
|
53 | private deadline: Deadline;
|
54 | private host: string;
|
55 | private statusWatchers: ((status: StatusObject) => void)[] = [];
|
56 | private deadlineTimer: NodeJS.Timeout = setTimeout(() => {}, 0);
|
57 | private filterStack: FilterStack | null = null;
|
58 |
|
59 | constructor(
|
60 | private readonly channel: InternalChannel,
|
61 | private readonly method: string,
|
62 | options: CallStreamOptions,
|
63 | private readonly filterStackFactory: FilterStackFactory,
|
64 | private credentials: CallCredentials,
|
65 | private callNumber: number
|
66 | ) {
|
67 | this.deadline = options.deadline;
|
68 | this.host = options.host;
|
69 | if (options.parentCall) {
|
70 | if (options.flags & Propagate.CANCELLATION) {
|
71 | options.parentCall.on('cancelled', () => {
|
72 | this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
|
73 | });
|
74 | }
|
75 | if (options.flags & Propagate.DEADLINE) {
|
76 | this.trace(
|
77 | 'Propagating deadline from parent: ' +
|
78 | options.parentCall.getDeadline()
|
79 | );
|
80 | this.deadline = minDeadline(
|
81 | this.deadline,
|
82 | options.parentCall.getDeadline()
|
83 | );
|
84 | }
|
85 | }
|
86 | this.trace('Created');
|
87 | this.runDeadlineTimer();
|
88 | }
|
89 |
|
90 | private trace(text: string): void {
|
91 | logging.trace(
|
92 | LogVerbosity.DEBUG,
|
93 | TRACER_NAME,
|
94 | '[' + this.callNumber + '] ' + text
|
95 | );
|
96 | }
|
97 |
|
98 | private runDeadlineTimer() {
|
99 | clearTimeout(this.deadlineTimer);
|
100 | this.trace('Deadline: ' + deadlineToString(this.deadline));
|
101 | const timeout = getRelativeTimeout(this.deadline);
|
102 | if (timeout !== Infinity) {
|
103 | this.trace('Deadline will be reached in ' + timeout + 'ms');
|
104 | const handleDeadline = () => {
|
105 | this.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
|
106 | };
|
107 | if (timeout <= 0) {
|
108 | process.nextTick(handleDeadline);
|
109 | } else {
|
110 | this.deadlineTimer = setTimeout(handleDeadline, timeout);
|
111 | }
|
112 | }
|
113 | }
|
114 |
|
115 | private outputStatus(status: StatusObject) {
|
116 | if (!this.ended) {
|
117 | this.ended = true;
|
118 | if (!this.filterStack) {
|
119 | this.filterStack = this.filterStackFactory.createFilter();
|
120 | }
|
121 | clearTimeout(this.deadlineTimer);
|
122 | const filteredStatus = this.filterStack.receiveTrailers(status);
|
123 | this.trace(
|
124 | 'ended with status: code=' +
|
125 | filteredStatus.code +
|
126 | ' details="' +
|
127 | filteredStatus.details +
|
128 | '"'
|
129 | );
|
130 | this.statusWatchers.forEach(watcher => watcher(filteredStatus));
|
131 | process.nextTick(() => {
|
132 | this.listener?.onReceiveStatus(filteredStatus);
|
133 | });
|
134 | }
|
135 | }
|
136 |
|
137 | private sendMessageOnChild(context: MessageContext, message: Buffer): void {
|
138 | if (!this.child) {
|
139 | throw new Error('sendMessageonChild called with child not populated');
|
140 | }
|
141 | const child = this.child;
|
142 | this.writeFilterPending = true;
|
143 | this.filterStack!.sendMessage(
|
144 | Promise.resolve({ message: message, flags: context.flags })
|
145 | ).then(
|
146 | filteredMessage => {
|
147 | this.writeFilterPending = false;
|
148 | child.sendMessageWithContext(context, filteredMessage.message);
|
149 | if (this.pendingHalfClose) {
|
150 | child.halfClose();
|
151 | }
|
152 | },
|
153 | (status: StatusObject) => {
|
154 | this.cancelWithStatus(status.code, status.details);
|
155 | }
|
156 | );
|
157 | }
|
158 |
|
159 | getConfig(): void {
|
160 | if (this.ended) {
|
161 | return;
|
162 | }
|
163 | if (!this.metadata || !this.listener) {
|
164 | throw new Error('getConfig called before start');
|
165 | }
|
166 | const configResult = this.channel.getConfig(this.method, this.metadata);
|
167 | if (configResult.type === 'NONE') {
|
168 | this.channel.queueCallForConfig(this);
|
169 | return;
|
170 | } else if (configResult.type === 'ERROR') {
|
171 | if (this.metadata.getOptions().waitForReady) {
|
172 | this.channel.queueCallForConfig(this);
|
173 | } else {
|
174 | this.outputStatus(configResult.error);
|
175 | }
|
176 | return;
|
177 | }
|
178 |
|
179 | const config = configResult.config;
|
180 | if (config.status !== Status.OK) {
|
181 | const { code, details } = restrictControlPlaneStatusCode(
|
182 | config.status,
|
183 | 'Failed to route call to method ' + this.method
|
184 | );
|
185 | this.outputStatus({
|
186 | code: code,
|
187 | details: details,
|
188 | metadata: new Metadata(),
|
189 | });
|
190 | return;
|
191 | }
|
192 |
|
193 | if (config.methodConfig.timeout) {
|
194 | const configDeadline = new Date();
|
195 | configDeadline.setSeconds(
|
196 | configDeadline.getSeconds() + config.methodConfig.timeout.seconds
|
197 | );
|
198 | configDeadline.setMilliseconds(
|
199 | configDeadline.getMilliseconds() +
|
200 | config.methodConfig.timeout.nanos / 1_000_000
|
201 | );
|
202 | this.deadline = minDeadline(this.deadline, configDeadline);
|
203 | this.runDeadlineTimer();
|
204 | }
|
205 |
|
206 | this.filterStackFactory.push(config.dynamicFilterFactories);
|
207 | this.filterStack = this.filterStackFactory.createFilter();
|
208 | this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(
|
209 | filteredMetadata => {
|
210 | this.child = this.channel.createInnerCall(
|
211 | config,
|
212 | this.method,
|
213 | this.host,
|
214 | this.credentials,
|
215 | this.deadline
|
216 | );
|
217 | this.trace('Created child [' + this.child.getCallNumber() + ']');
|
218 | this.child.start(filteredMetadata, {
|
219 | onReceiveMetadata: metadata => {
|
220 | this.trace('Received metadata');
|
221 | this.listener!.onReceiveMetadata(
|
222 | this.filterStack!.receiveMetadata(metadata)
|
223 | );
|
224 | },
|
225 | onReceiveMessage: message => {
|
226 | this.trace('Received message');
|
227 | this.readFilterPending = true;
|
228 | this.filterStack!.receiveMessage(message).then(
|
229 | filteredMesssage => {
|
230 | this.trace('Finished filtering received message');
|
231 | this.readFilterPending = false;
|
232 | this.listener!.onReceiveMessage(filteredMesssage);
|
233 | if (this.pendingChildStatus) {
|
234 | this.outputStatus(this.pendingChildStatus);
|
235 | }
|
236 | },
|
237 | (status: StatusObject) => {
|
238 | this.cancelWithStatus(status.code, status.details);
|
239 | }
|
240 | );
|
241 | },
|
242 | onReceiveStatus: status => {
|
243 | this.trace('Received status');
|
244 | if (this.readFilterPending) {
|
245 | this.pendingChildStatus = status;
|
246 | } else {
|
247 | this.outputStatus(status);
|
248 | }
|
249 | },
|
250 | });
|
251 | if (this.readPending) {
|
252 | this.child.startRead();
|
253 | }
|
254 | if (this.pendingMessage) {
|
255 | this.sendMessageOnChild(
|
256 | this.pendingMessage.context,
|
257 | this.pendingMessage.message
|
258 | );
|
259 | } else if (this.pendingHalfClose) {
|
260 | this.child.halfClose();
|
261 | }
|
262 | },
|
263 | (status: StatusObject) => {
|
264 | this.outputStatus(status);
|
265 | }
|
266 | );
|
267 | }
|
268 |
|
269 | reportResolverError(status: StatusObject) {
|
270 | if (this.metadata?.getOptions().waitForReady) {
|
271 | this.channel.queueCallForConfig(this);
|
272 | } else {
|
273 | this.outputStatus(status);
|
274 | }
|
275 | }
|
276 | cancelWithStatus(status: Status, details: string): void {
|
277 | this.trace(
|
278 | 'cancelWithStatus code: ' + status + ' details: "' + details + '"'
|
279 | );
|
280 | this.child?.cancelWithStatus(status, details);
|
281 | this.outputStatus({
|
282 | code: status,
|
283 | details: details,
|
284 | metadata: new Metadata(),
|
285 | });
|
286 | }
|
287 | getPeer(): string {
|
288 | return this.child?.getPeer() ?? this.channel.getTarget();
|
289 | }
|
290 | start(metadata: Metadata, listener: InterceptingListener): void {
|
291 | this.trace('start called');
|
292 | this.metadata = metadata.clone();
|
293 | this.listener = listener;
|
294 | this.getConfig();
|
295 | }
|
296 | sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
297 | this.trace('write() called with message of length ' + message.length);
|
298 | if (this.child) {
|
299 | this.sendMessageOnChild(context, message);
|
300 | } else {
|
301 | this.pendingMessage = { context, message };
|
302 | }
|
303 | }
|
304 | startRead(): void {
|
305 | this.trace('startRead called');
|
306 | if (this.child) {
|
307 | this.child.startRead();
|
308 | } else {
|
309 | this.readPending = true;
|
310 | }
|
311 | }
|
312 | halfClose(): void {
|
313 | this.trace('halfClose called');
|
314 | if (this.child && !this.writeFilterPending) {
|
315 | this.child.halfClose();
|
316 | } else {
|
317 | this.pendingHalfClose = true;
|
318 | }
|
319 | }
|
320 | setCredentials(credentials: CallCredentials): void {
|
321 | this.credentials = this.credentials.compose(credentials);
|
322 | }
|
323 |
|
324 | addStatusWatcher(watcher: (status: StatusObject) => void) {
|
325 | this.statusWatchers.push(watcher);
|
326 | }
|
327 |
|
328 | getCallNumber(): number {
|
329 | return this.callNumber;
|
330 | }
|
331 | }
|