1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { CallCredentials } from "./call-credentials";
|
19 | import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
|
20 | import { LogVerbosity, Propagate, Status } from "./constants";
|
21 | import { Deadline, deadlineToString, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline";
|
22 | import { FilterStack, FilterStackFactory } from "./filter-stack";
|
23 | import { InternalChannel } from "./internal-channel";
|
24 | import { Metadata } from "./metadata";
|
25 | import * as logging from './logging';
|
26 | import { restrictControlPlaneStatusCode } from "./control-plane-status";
|
27 |
|
28 | const TRACER_NAME = 'resolving_call';
|
29 |
|
30 | export class ResolvingCall implements Call {
|
31 | private child: Call | null = null;
|
32 | private readPending = false;
|
33 | private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
|
34 | private pendingHalfClose = false;
|
35 | private ended = false;
|
36 | private readFilterPending = false;
|
37 | private writeFilterPending = false;
|
38 | private pendingChildStatus: StatusObject | null = null;
|
39 | private metadata: Metadata | null = null;
|
40 | private listener: InterceptingListener | null = null;
|
41 | private deadline: Deadline;
|
42 | private host: string;
|
43 | private statusWatchers: ((status: StatusObject) => void)[] = [];
|
44 | private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
|
45 | private filterStack: FilterStack | null = null;
|
46 |
|
47 | constructor(
|
48 | private readonly channel: InternalChannel,
|
49 | private readonly method: string,
|
50 | options: CallStreamOptions,
|
51 | private readonly filterStackFactory: FilterStackFactory,
|
52 | private credentials: CallCredentials,
|
53 | private callNumber: number
|
54 | ) {
|
55 | this.deadline = options.deadline;
|
56 | this.host = options.host;
|
57 | if (options.parentCall) {
|
58 | if (options.flags & Propagate.CANCELLATION) {
|
59 | options.parentCall.on('cancelled', () => {
|
60 | this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
|
61 | });
|
62 | }
|
63 | if (options.flags & Propagate.DEADLINE) {
|
64 | this.trace('Propagating deadline from parent: ' + options.parentCall.getDeadline());
|
65 | this.deadline = minDeadline(this.deadline, options.parentCall.getDeadline());
|
66 | }
|
67 | }
|
68 | this.trace('Created');
|
69 | this.runDeadlineTimer();
|
70 | }
|
71 |
|
72 | private trace(text: string): void {
|
73 | logging.trace(
|
74 | LogVerbosity.DEBUG,
|
75 | TRACER_NAME,
|
76 | '[' + this.callNumber + '] ' + text
|
77 | );
|
78 | }
|
79 |
|
80 | private runDeadlineTimer() {
|
81 | clearTimeout(this.deadlineTimer);
|
82 | this.trace('Deadline: ' + deadlineToString(this.deadline));
|
83 | const timeout = getRelativeTimeout(this.deadline);
|
84 | if (timeout !== Infinity) {
|
85 | this.trace('Deadline will be reached in ' + timeout + 'ms');
|
86 | const handleDeadline = () => {
|
87 | this.cancelWithStatus(
|
88 | Status.DEADLINE_EXCEEDED,
|
89 | 'Deadline exceeded'
|
90 | );
|
91 | }
|
92 | if (timeout <= 0) {
|
93 | process.nextTick(handleDeadline);
|
94 | } else {
|
95 | this.deadlineTimer = setTimeout(handleDeadline, timeout);
|
96 | }
|
97 | }
|
98 | }
|
99 |
|
100 | private outputStatus(status: StatusObject) {
|
101 | if (!this.ended) {
|
102 | this.ended = true;
|
103 | if (!this.filterStack) {
|
104 | this.filterStack = this.filterStackFactory.createFilter();
|
105 | }
|
106 | clearTimeout(this.deadlineTimer);
|
107 | const filteredStatus = this.filterStack.receiveTrailers(status);
|
108 | this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"');
|
109 | this.statusWatchers.forEach(watcher => watcher(filteredStatus));
|
110 | process.nextTick(() => {
|
111 | this.listener?.onReceiveStatus(filteredStatus);
|
112 | });
|
113 | }
|
114 | }
|
115 |
|
116 | private sendMessageOnChild(context: MessageContext, message: Buffer): void {
|
117 | if (!this.child) {
|
118 | throw new Error('sendMessageonChild called with child not populated');
|
119 | }
|
120 | const child = this.child;
|
121 | this.writeFilterPending = true;
|
122 | this.filterStack!.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
|
123 | this.writeFilterPending = false;
|
124 | child.sendMessageWithContext(context, filteredMessage.message);
|
125 | if (this.pendingHalfClose) {
|
126 | child.halfClose();
|
127 | }
|
128 | }, (status: StatusObject) => {
|
129 | this.cancelWithStatus(status.code, status.details);
|
130 | });
|
131 | }
|
132 |
|
133 | getConfig(): void {
|
134 | if (this.ended) {
|
135 | return;
|
136 | }
|
137 | if (!this.metadata || !this.listener) {
|
138 | throw new Error('getConfig called before start');
|
139 | }
|
140 | const configResult = this.channel.getConfig(this.method, this.metadata);
|
141 | if (configResult.type === 'NONE') {
|
142 | this.channel.queueCallForConfig(this);
|
143 | return;
|
144 | } else if (configResult.type === 'ERROR') {
|
145 | if (this.metadata.getOptions().waitForReady) {
|
146 | this.channel.queueCallForConfig(this);
|
147 | } else {
|
148 | this.outputStatus(configResult.error);
|
149 | }
|
150 | return;
|
151 | }
|
152 |
|
153 | const config = configResult.config;
|
154 | if (config.status !== Status.OK) {
|
155 | const {code, details} = restrictControlPlaneStatusCode(config.status, 'Failed to route call to method ' + this.method);
|
156 | this.outputStatus({
|
157 | code: code,
|
158 | details: details,
|
159 | metadata: new Metadata()
|
160 | });
|
161 | return;
|
162 | }
|
163 |
|
164 | if (config.methodConfig.timeout) {
|
165 | const configDeadline = new Date();
|
166 | configDeadline.setSeconds(
|
167 | configDeadline.getSeconds() + config.methodConfig.timeout.seconds
|
168 | );
|
169 | configDeadline.setMilliseconds(
|
170 | configDeadline.getMilliseconds() +
|
171 | config.methodConfig.timeout.nanos / 1_000_000
|
172 | );
|
173 | this.deadline = minDeadline(this.deadline, configDeadline);
|
174 | this.runDeadlineTimer();
|
175 | }
|
176 |
|
177 | this.filterStackFactory.push(config.dynamicFilterFactories);
|
178 | this.filterStack = this.filterStackFactory.createFilter();
|
179 | this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => {
|
180 | this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
|
181 | this.trace('Created child [' + this.child.getCallNumber() + ']')
|
182 | this.child.start(filteredMetadata, {
|
183 | onReceiveMetadata: metadata => {
|
184 | this.trace('Received metadata')
|
185 | this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata));
|
186 | },
|
187 | onReceiveMessage: message => {
|
188 | this.trace('Received message');
|
189 | this.readFilterPending = true;
|
190 | this.filterStack!.receiveMessage(message).then(filteredMesssage => {
|
191 | this.trace('Finished filtering received message');
|
192 | this.readFilterPending = false;
|
193 | this.listener!.onReceiveMessage(filteredMesssage);
|
194 | if (this.pendingChildStatus) {
|
195 | this.outputStatus(this.pendingChildStatus);
|
196 | }
|
197 | }, (status: StatusObject) => {
|
198 | this.cancelWithStatus(status.code, status.details);
|
199 | });
|
200 | },
|
201 | onReceiveStatus: status => {
|
202 | this.trace('Received status');
|
203 | if (this.readFilterPending) {
|
204 | this.pendingChildStatus = status;
|
205 | } else {
|
206 | this.outputStatus(status);
|
207 | }
|
208 | }
|
209 | });
|
210 | if (this.readPending) {
|
211 | this.child.startRead();
|
212 | }
|
213 | if (this.pendingMessage) {
|
214 | this.sendMessageOnChild(this.pendingMessage.context, this.pendingMessage.message);
|
215 | } else if (this.pendingHalfClose) {
|
216 | this.child.halfClose();
|
217 | }
|
218 | }, (status: StatusObject) => {
|
219 | this.outputStatus(status);
|
220 | })
|
221 | }
|
222 |
|
223 | reportResolverError(status: StatusObject) {
|
224 | if (this.metadata?.getOptions().waitForReady) {
|
225 | this.channel.queueCallForConfig(this);
|
226 | } else {
|
227 | this.outputStatus(status);
|
228 | }
|
229 | }
|
230 | cancelWithStatus(status: Status, details: string): void {
|
231 | this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
|
232 | this.child?.cancelWithStatus(status, details);
|
233 | this.outputStatus({code: status, details: details, metadata: new Metadata()});
|
234 | }
|
235 | getPeer(): string {
|
236 | return this.child?.getPeer() ?? this.channel.getTarget();
|
237 | }
|
238 | start(metadata: Metadata, listener: InterceptingListener): void {
|
239 | this.trace('start called');
|
240 | this.metadata = metadata.clone();
|
241 | this.listener = listener;
|
242 | this.getConfig();
|
243 | }
|
244 | sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
245 | this.trace('write() called with message of length ' + message.length);
|
246 | if (this.child) {
|
247 | this.sendMessageOnChild(context, message);
|
248 | } else {
|
249 | this.pendingMessage = {context, message};
|
250 | }
|
251 | }
|
252 | startRead(): void {
|
253 | this.trace('startRead called');
|
254 | if (this.child) {
|
255 | this.child.startRead();
|
256 | } else {
|
257 | this.readPending = true;
|
258 | }
|
259 | }
|
260 | halfClose(): void {
|
261 | this.trace('halfClose called');
|
262 | if (this.child && !this.writeFilterPending) {
|
263 | this.child.halfClose();
|
264 | } else {
|
265 | this.pendingHalfClose = true;
|
266 | }
|
267 | }
|
268 | setCredentials(credentials: CallCredentials): void {
|
269 | this.credentials = this.credentials.compose(credentials);
|
270 | }
|
271 |
|
272 | addStatusWatcher(watcher: (status: StatusObject) => void) {
|
273 | this.statusWatchers.push(watcher);
|
274 | }
|
275 |
|
276 | getCallNumber(): number {
|
277 | return this.callNumber;
|
278 | }
|
279 | } |
\ | No newline at end of file |