1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
11 |
12 |
13 |
14 |
15 |
16 |
17 |
18 | import { CallCredentials } from "./call-credentials";
19 | import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
20 | import { LogVerbosity, Propagate, Status } from "./constants";
21 | import { Deadline, deadlineToString, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline";
22 | import { FilterStack, FilterStackFactory } from "./filter-stack";
23 | import { InternalChannel } from "./internal-channel";
24 | import { Metadata } from "./metadata";
25 | import * as logging from './logging';
26 | import { restrictControlPlaneStatusCode } from "./control-plane-status";
27 |
28 | const TRACER_NAME = 'resolving_call';
29 |
30 | export class ResolvingCall implements Call {
31 | private child: Call | null = null;
32 | private readPending = false;
33 | private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
34 | private pendingHalfClose = false;
35 | private ended = false;
36 | private readFilterPending = false;
37 | private writeFilterPending = false;
38 | private pendingChildStatus: StatusObject | null = null;
39 | private metadata: Metadata | null = null;
40 | private listener: InterceptingListener | null = null;
41 | private deadline: Deadline;
42 | private host: string;
43 | private statusWatchers: ((status: StatusObject) => void)[] = [];
44 | private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0);
45 | private filterStack: FilterStack | null = null;
46 |
47 | constructor(
48 | private readonly channel: InternalChannel,
49 | private readonly method: string,
50 | options: CallStreamOptions,
51 | private readonly filterStackFactory: FilterStackFactory,
52 | private credentials: CallCredentials,
53 | private callNumber: number
54 | ) {
55 | this.deadline = options.deadline;
56 | this.host = options.host;
57 | if (options.parentCall) {
58 | if (options.flags & Propagate.CANCELLATION) {
59 | options.parentCall.on('cancelled', () => {
60 | this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call');
61 | });
62 | }
63 | if (options.flags & Propagate.DEADLINE) {
64 | this.trace('Propagating deadline from parent: ' + options.parentCall.getDeadline());
65 | this.deadline = minDeadline(this.deadline, options.parentCall.getDeadline());
66 | }
67 | }
68 | this.trace('Created');
69 | this.runDeadlineTimer();
70 | }
71 |
72 | private trace(text: string): void {
73 | logging.trace(
74 | LogVerbosity.DEBUG,
76 | '[' + this.callNumber + '] ' + text
77 | );
78 | }
79 |
80 | private runDeadlineTimer() {
81 | clearTimeout(this.deadlineTimer);
82 | this.trace('Deadline: ' + deadlineToString(this.deadline));
83 | const timeout = getRelativeTimeout(this.deadline);
84 | if (timeout !== Infinity) {
85 | this.trace('Deadline will be reached in ' + timeout + 'ms');
86 | const handleDeadline = () => {
87 | this.cancelWithStatus(
89 | 'Deadline exceeded'
90 | );
91 | }
92 | if (timeout <= 0) {
93 | process.nextTick(handleDeadline);
94 | } else {
95 | this.deadlineTimer = setTimeout(handleDeadline, timeout);
96 | }
97 | }
98 | }
99 |
100 | private outputStatus(status: StatusObject) {
101 | if (!this.ended) {
102 | this.ended = true;
103 | if (!this.filterStack) {
104 | this.filterStack = this.filterStackFactory.createFilter();
105 | }
106 | clearTimeout(this.deadlineTimer);
107 | const filteredStatus = this.filterStack.receiveTrailers(status);
108 | this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"');
109 | this.statusWatchers.forEach(watcher => watcher(filteredStatus));
110 | process.nextTick(() => {
111 | this.listener?.onReceiveStatus(filteredStatus);
112 | });
113 | }
114 | }
115 |
116 | private sendMessageOnChild(context: MessageContext, message: Buffer): void {
117 | if (!this.child) {
118 | throw new Error('sendMessageonChild called with child not populated');
119 | }
120 | const child = this.child;
121 | this.writeFilterPending = true;
122 | this.filterStack!.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => {
123 | this.writeFilterPending = false;
124 | child.sendMessageWithContext(context, filteredMessage.message);
125 | if (this.pendingHalfClose) {
126 | child.halfClose();
127 | }
128 | }, (status: StatusObject) => {
129 | this.cancelWithStatus(status.code, status.details);
130 | });
131 | }
132 |
133 | getConfig(): void {
134 | if (this.ended) {
135 | return;
136 | }
137 | if (!this.metadata || !this.listener) {
138 | throw new Error('getConfig called before start');
139 | }
140 | const configResult = this.channel.getConfig(this.method, this.metadata);
141 | if (configResult.type === 'NONE') {
142 | this.channel.queueCallForConfig(this);
143 | return;
144 | } else if (configResult.type === 'ERROR') {
145 | if (this.metadata.getOptions().waitForReady) {
146 | this.channel.queueCallForConfig(this);
147 | } else {
148 | this.outputStatus(configResult.error);
149 | }
150 | return;
151 | }
152 |
153 | const config = configResult.config;
154 | if (config.status !== Status.OK) {
155 | const {code, details} = restrictControlPlaneStatusCode(config.status, 'Failed to route call to method ' + this.method);
156 | this.outputStatus({
157 | code: code,
158 | details: details,
159 | metadata: new Metadata()
160 | });
161 | return;
162 | }
163 |
164 | if (config.methodConfig.timeout) {
165 | const configDeadline = new Date();
166 | configDeadline.setSeconds(
167 | configDeadline.getSeconds() + config.methodConfig.timeout.seconds
168 | );
169 | configDeadline.setMilliseconds(
170 | configDeadline.getMilliseconds() +
171 | config.methodConfig.timeout.nanos / 1_000_000
172 | );
173 | this.deadline = minDeadline(this.deadline, configDeadline);
174 | this.runDeadlineTimer();
175 | }
176 |
177 | this.filterStackFactory.push(config.dynamicFilterFactories);
178 | this.filterStack = this.filterStackFactory.createFilter();
179 | this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => {
180 | this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
181 | this.trace('Created child [' + this.child.getCallNumber() + ']')
182 | this.child.start(filteredMetadata, {
183 | onReceiveMetadata: metadata => {
184 | this.trace('Received metadata')
185 | this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata));
186 | },
187 | onReceiveMessage: message => {
188 | this.trace('Received message');
189 | this.readFilterPending = true;
190 | this.filterStack!.receiveMessage(message).then(filteredMesssage => {
191 | this.trace('Finished filtering received message');
192 | this.readFilterPending = false;
193 | this.listener!.onReceiveMessage(filteredMesssage);
194 | if (this.pendingChildStatus) {
195 | this.outputStatus(this.pendingChildStatus);
196 | }
197 | }, (status: StatusObject) => {
198 | this.cancelWithStatus(status.code, status.details);
199 | });
200 | },
201 | onReceiveStatus: status => {
202 | this.trace('Received status');
203 | if (this.readFilterPending) {
204 | this.pendingChildStatus = status;
205 | } else {
206 | this.outputStatus(status);
207 | }
208 | }
209 | });
210 | if (this.readPending) {
211 | this.child.startRead();
212 | }
213 | if (this.pendingMessage) {
214 | this.sendMessageOnChild(this.pendingMessage.context, this.pendingMessage.message);
215 | } else if (this.pendingHalfClose) {
216 | this.child.halfClose();
217 | }
218 | }, (status: StatusObject) => {
219 | this.outputStatus(status);
220 | })
221 | }
222 |
223 | reportResolverError(status: StatusObject) {
224 | if (this.metadata?.getOptions().waitForReady) {
225 | this.channel.queueCallForConfig(this);
226 | } else {
227 | this.outputStatus(status);
228 | }
229 | }
230 | cancelWithStatus(status: Status, details: string): void {
231 | this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
232 | this.child?.cancelWithStatus(status, details);
233 | this.outputStatus({code: status, details: details, metadata: new Metadata()});
234 | }
235 | getPeer(): string {
236 | return this.child?.getPeer() ?? this.channel.getTarget();
237 | }
238 | start(metadata: Metadata, listener: InterceptingListener): void {
239 | this.trace('start called');
240 | this.metadata = metadata.clone();
241 | this.listener = listener;
242 | this.getConfig();
243 | }
244 | sendMessageWithContext(context: MessageContext, message: Buffer): void {
245 | this.trace('write() called with message of length ' + message.length);
246 | if (this.child) {
247 | this.sendMessageOnChild(context, message);
248 | } else {
249 | this.pendingMessage = {context, message};
250 | }
251 | }
252 | startRead(): void {
253 | this.trace('startRead called');
254 | if (this.child) {
255 | this.child.startRead();
256 | } else {
257 | this.readPending = true;
258 | }
259 | }
260 | halfClose(): void {
261 | this.trace('halfClose called');
262 | if (this.child && !this.writeFilterPending) {
263 | this.child.halfClose();
264 | } else {
265 | this.pendingHalfClose = true;
266 | }
267 | }
268 | setCredentials(credentials: CallCredentials): void {
269 | this.credentials = this.credentials.compose(credentials);
270 | }
271 |
272 | addStatusWatcher(watcher: (status: StatusObject) => void) {
273 | this.statusWatchers.push(watcher);
274 | }
275 |
276 | getCallNumber(): number {
277 | return this.callNumber;
278 | }
279 | } |
\ | No newline at end of file |