1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { Metadata } from './metadata';
|
19 | import {
|
20 | StatusObject,
|
21 | Listener,
|
22 | MetadataListener,
|
23 | MessageListener,
|
24 | StatusListener,
|
25 | FullListener,
|
26 | InterceptingListener,
|
27 | InterceptingListenerImpl,
|
28 | isInterceptingListener,
|
29 | MessageContext,
|
30 | Call,
|
31 | } from './call-interface';
|
32 | import { Status } from './constants';
|
33 | import { Channel } from './channel';
|
34 | import { CallOptions } from './client';
|
35 | import { CallCredentials } from './call-credentials';
|
36 | import { ClientMethodDefinition } from './make-client';
|
37 | import { getErrorMessage } from './error';
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 | export class InterceptorConfigurationError extends Error {
|
44 | constructor(message: string) {
|
45 | super(message);
|
46 | this.name = 'InterceptorConfigurationError';
|
47 | Error.captureStackTrace(this, InterceptorConfigurationError);
|
48 | }
|
49 | }
|
50 |
|
51 | export interface MetadataRequester {
|
52 | (
|
53 | metadata: Metadata,
|
54 | listener: InterceptingListener,
|
55 | next: (
|
56 | metadata: Metadata,
|
57 | listener: InterceptingListener | Listener
|
58 | ) => void
|
59 | ): void;
|
60 | }
|
61 |
|
62 | export interface MessageRequester {
|
63 |
|
64 | (message: any, next: (message: any) => void): void;
|
65 | }
|
66 |
|
67 | export interface CloseRequester {
|
68 | (next: () => void): void;
|
69 | }
|
70 |
|
71 | export interface CancelRequester {
|
72 | (next: () => void): void;
|
73 | }
|
74 |
|
75 |
|
76 |
|
77 |
|
78 | export interface FullRequester {
|
79 | start: MetadataRequester;
|
80 | sendMessage: MessageRequester;
|
81 | halfClose: CloseRequester;
|
82 | cancel: CancelRequester;
|
83 | }
|
84 |
|
85 | export type Requester = Partial<FullRequester>;
|
86 |
|
87 | export class ListenerBuilder {
|
88 | private metadata: MetadataListener | undefined = undefined;
|
89 | private message: MessageListener | undefined = undefined;
|
90 | private status: StatusListener | undefined = undefined;
|
91 |
|
92 | withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this {
|
93 | this.metadata = onReceiveMetadata;
|
94 | return this;
|
95 | }
|
96 |
|
97 | withOnReceiveMessage(onReceiveMessage: MessageListener): this {
|
98 | this.message = onReceiveMessage;
|
99 | return this;
|
100 | }
|
101 |
|
102 | withOnReceiveStatus(onReceiveStatus: StatusListener): this {
|
103 | this.status = onReceiveStatus;
|
104 | return this;
|
105 | }
|
106 |
|
107 | build(): Listener {
|
108 | return {
|
109 | onReceiveMetadata: this.metadata,
|
110 | onReceiveMessage: this.message,
|
111 | onReceiveStatus: this.status,
|
112 | };
|
113 | }
|
114 | }
|
115 |
|
116 | export class RequesterBuilder {
|
117 | private start: MetadataRequester | undefined = undefined;
|
118 | private message: MessageRequester | undefined = undefined;
|
119 | private halfClose: CloseRequester | undefined = undefined;
|
120 | private cancel: CancelRequester | undefined = undefined;
|
121 |
|
122 | withStart(start: MetadataRequester): this {
|
123 | this.start = start;
|
124 | return this;
|
125 | }
|
126 |
|
127 | withSendMessage(sendMessage: MessageRequester): this {
|
128 | this.message = sendMessage;
|
129 | return this;
|
130 | }
|
131 |
|
132 | withHalfClose(halfClose: CloseRequester): this {
|
133 | this.halfClose = halfClose;
|
134 | return this;
|
135 | }
|
136 |
|
137 | withCancel(cancel: CancelRequester): this {
|
138 | this.cancel = cancel;
|
139 | return this;
|
140 | }
|
141 |
|
142 | build(): Requester {
|
143 | return {
|
144 | start: this.start,
|
145 | sendMessage: this.message,
|
146 | halfClose: this.halfClose,
|
147 | cancel: this.cancel,
|
148 | };
|
149 | }
|
150 | }
|
151 |
|
152 |
|
153 |
|
154 |
|
155 |
|
156 | const defaultListener: FullListener = {
|
157 | onReceiveMetadata: (metadata, next) => {
|
158 | next(metadata);
|
159 | },
|
160 | onReceiveMessage: (message, next) => {
|
161 | next(message);
|
162 | },
|
163 | onReceiveStatus: (status, next) => {
|
164 | next(status);
|
165 | },
|
166 | };
|
167 |
|
168 |
|
169 |
|
170 |
|
171 |
|
172 | const defaultRequester: FullRequester = {
|
173 | start: (metadata, listener, next) => {
|
174 | next(metadata, listener);
|
175 | },
|
176 | sendMessage: (message, next) => {
|
177 | next(message);
|
178 | },
|
179 | halfClose: (next) => {
|
180 | next();
|
181 | },
|
182 | cancel: (next) => {
|
183 | next();
|
184 | },
|
185 | };
|
186 |
|
187 | export interface InterceptorOptions extends CallOptions {
|
188 |
|
189 | method_definition: ClientMethodDefinition<any, any>;
|
190 | }
|
191 |
|
192 | export interface InterceptingCallInterface {
|
193 | cancelWithStatus(status: Status, details: string): void;
|
194 | getPeer(): string;
|
195 | start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
|
196 |
|
197 | sendMessageWithContext(context: MessageContext, message: any): void;
|
198 |
|
199 | sendMessage(message: any): void;
|
200 | startRead(): void;
|
201 | halfClose(): void;
|
202 | }
|
203 |
|
204 | export class InterceptingCall implements InterceptingCallInterface {
|
205 | |
206 |
|
207 |
|
208 | private requester: FullRequester;
|
209 | |
210 |
|
211 |
|
212 |
|
213 | private processingMetadata = false;
|
214 | |
215 |
|
216 |
|
217 | private pendingMessageContext: MessageContext | null = null;
|
218 | private pendingMessage: any;
|
219 | |
220 |
|
221 |
|
222 |
|
223 | private processingMessage = false;
|
224 | |
225 |
|
226 |
|
227 |
|
228 | private pendingHalfClose = false;
|
229 | constructor(
|
230 | private nextCall: InterceptingCallInterface,
|
231 | requester?: Requester
|
232 | ) {
|
233 | if (requester) {
|
234 | this.requester = {
|
235 | start: requester.start ?? defaultRequester.start,
|
236 | sendMessage: requester.sendMessage ?? defaultRequester.sendMessage,
|
237 | halfClose: requester.halfClose ?? defaultRequester.halfClose,
|
238 | cancel: requester.cancel ?? defaultRequester.cancel,
|
239 | };
|
240 | } else {
|
241 | this.requester = defaultRequester;
|
242 | }
|
243 | }
|
244 |
|
245 | cancelWithStatus(status: Status, details: string) {
|
246 | this.requester.cancel(() => {
|
247 | this.nextCall.cancelWithStatus(status, details);
|
248 | });
|
249 | }
|
250 |
|
251 | getPeer() {
|
252 | return this.nextCall.getPeer();
|
253 | }
|
254 |
|
255 | private processPendingMessage() {
|
256 | if (this.pendingMessageContext) {
|
257 | this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage);
|
258 | this.pendingMessageContext = null;
|
259 | this.pendingMessage = null;
|
260 | }
|
261 | }
|
262 |
|
263 | private processPendingHalfClose() {
|
264 | if (this.pendingHalfClose) {
|
265 | this.nextCall.halfClose();
|
266 | }
|
267 | }
|
268 |
|
269 | start(
|
270 | metadata: Metadata,
|
271 | interceptingListener?: Partial<InterceptingListener>
|
272 | ): void {
|
273 | const fullInterceptingListener: InterceptingListener = {
|
274 | onReceiveMetadata:
|
275 | interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
|
276 | ((metadata) => {}),
|
277 | onReceiveMessage:
|
278 | interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
|
279 | ((message) => {}),
|
280 | onReceiveStatus:
|
281 | interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
|
282 | ((status) => {}),
|
283 | };
|
284 | this.processingMetadata = true;
|
285 | this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
|
286 | this.processingMetadata = false;
|
287 | let finalInterceptingListener: InterceptingListener;
|
288 | if (isInterceptingListener(listener)) {
|
289 | finalInterceptingListener = listener;
|
290 | } else {
|
291 | const fullListener: FullListener = {
|
292 | onReceiveMetadata:
|
293 | listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
|
294 | onReceiveMessage:
|
295 | listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
|
296 | onReceiveStatus:
|
297 | listener.onReceiveStatus ?? defaultListener.onReceiveStatus,
|
298 | };
|
299 | finalInterceptingListener = new InterceptingListenerImpl(
|
300 | fullListener,
|
301 | fullInterceptingListener
|
302 | );
|
303 | }
|
304 | this.nextCall.start(md, finalInterceptingListener);
|
305 | this.processPendingMessage();
|
306 | this.processPendingHalfClose();
|
307 | });
|
308 | }
|
309 |
|
310 | sendMessageWithContext(context: MessageContext, message: any): void {
|
311 | this.processingMessage = true;
|
312 | this.requester.sendMessage(message, (finalMessage) => {
|
313 | this.processingMessage = false;
|
314 | if (this.processingMetadata) {
|
315 | this.pendingMessageContext = context;
|
316 | this.pendingMessage = message;
|
317 | } else {
|
318 | this.nextCall.sendMessageWithContext(context, finalMessage);
|
319 | this.processPendingHalfClose();
|
320 | }
|
321 | });
|
322 | }
|
323 |
|
324 | sendMessage(message: any): void {
|
325 | this.sendMessageWithContext({}, message);
|
326 | }
|
327 | startRead(): void {
|
328 | this.nextCall.startRead();
|
329 | }
|
330 | halfClose(): void {
|
331 | this.requester.halfClose(() => {
|
332 | if (this.processingMetadata || this.processingMessage) {
|
333 | this.pendingHalfClose = true;
|
334 | } else {
|
335 | this.nextCall.halfClose();
|
336 | }
|
337 | });
|
338 | }
|
339 | }
|
340 |
|
341 | function getCall(channel: Channel, path: string, options: CallOptions): Call {
|
342 | const deadline = options.deadline ?? Infinity;
|
343 | const host = options.host;
|
344 | const parent = options.parent ?? null;
|
345 | const propagateFlags = options.propagate_flags;
|
346 | const credentials = options.credentials;
|
347 | const call = channel.createCall(path, deadline, host, parent, propagateFlags);
|
348 | if (credentials) {
|
349 | call.setCredentials(credentials);
|
350 | }
|
351 | return call;
|
352 | }
|
353 |
|
354 |
|
355 |
|
356 |
|
357 |
|
358 | class BaseInterceptingCall implements InterceptingCallInterface {
|
359 | constructor(
|
360 | protected call: Call,
|
361 |
|
362 | protected methodDefinition: ClientMethodDefinition<any, any>
|
363 | ) {}
|
364 | cancelWithStatus(status: Status, details: string): void {
|
365 | this.call.cancelWithStatus(status, details);
|
366 | }
|
367 | getPeer(): string {
|
368 | return this.call.getPeer();
|
369 | }
|
370 |
|
371 | sendMessageWithContext(context: MessageContext, message: any): void {
|
372 | let serialized: Buffer;
|
373 | try {
|
374 | serialized = this.methodDefinition.requestSerialize(message);
|
375 | } catch (e) {
|
376 | this.call.cancelWithStatus(
|
377 | Status.INTERNAL,
|
378 | `Request message serialization failure: ${getErrorMessage(e)}`
|
379 | );
|
380 | return;
|
381 | }
|
382 | this.call.sendMessageWithContext(context, serialized);
|
383 | }
|
384 |
|
385 | sendMessage(message: any) {
|
386 | this.sendMessageWithContext({}, message);
|
387 | }
|
388 | start(
|
389 | metadata: Metadata,
|
390 | interceptingListener?: Partial<InterceptingListener>
|
391 | ): void {
|
392 | let readError: StatusObject | null = null;
|
393 | this.call.start(metadata, {
|
394 | onReceiveMetadata: (metadata) => {
|
395 | interceptingListener?.onReceiveMetadata?.(metadata);
|
396 | },
|
397 | onReceiveMessage: (message) => {
|
398 |
|
399 | let deserialized: any;
|
400 | try {
|
401 | deserialized = this.methodDefinition.responseDeserialize(message);
|
402 | } catch (e) {
|
403 | readError = {
|
404 | code: Status.INTERNAL,
|
405 | details: `Response message parsing error: ${getErrorMessage(e)}`,
|
406 | metadata: new Metadata(),
|
407 | };
|
408 | this.call.cancelWithStatus(readError.code, readError.details);
|
409 | return;
|
410 | }
|
411 | interceptingListener?.onReceiveMessage?.(deserialized);
|
412 | },
|
413 | onReceiveStatus: (status) => {
|
414 | if (readError) {
|
415 | interceptingListener?.onReceiveStatus?.(readError);
|
416 | } else {
|
417 | interceptingListener?.onReceiveStatus?.(status);
|
418 | }
|
419 | },
|
420 | });
|
421 | }
|
422 | startRead() {
|
423 | this.call.startRead();
|
424 | }
|
425 | halfClose(): void {
|
426 | this.call.halfClose();
|
427 | }
|
428 | }
|
429 |
|
430 |
|
431 |
|
432 |
|
433 |
|
434 | class BaseUnaryInterceptingCall
|
435 | extends BaseInterceptingCall
|
436 | implements InterceptingCallInterface {
|
437 |
|
438 | constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
|
439 | super(call, methodDefinition);
|
440 | }
|
441 | start(metadata: Metadata, listener?: Partial<InterceptingListener>): void {
|
442 | let receivedMessage = false;
|
443 | const wrapperListener: InterceptingListener = {
|
444 | onReceiveMetadata:
|
445 | listener?.onReceiveMetadata?.bind(listener) ?? ((metadata) => {}),
|
446 | // eslint-disable-next-line @typescript-eslint/no-explicit-any
|
447 | onReceiveMessage: (message: any) => {
|
448 | receivedMessage = true;
|
449 | listener?.onReceiveMessage?.(message);
|
450 | },
|
451 | onReceiveStatus: (status: StatusObject) => {
|
452 | if (!receivedMessage) {
|
453 | listener?.onReceiveMessage?.(null);
|
454 | }
|
455 | listener?.onReceiveStatus?.(status);
|
456 | },
|
457 | };
|
458 | super.start(metadata, wrapperListener);
|
459 | this.call.startRead();
|
460 | }
|
461 | }
|
462 |
|
463 |
|
464 |
|
465 |
|
466 |
|
467 | class BaseStreamingInterceptingCall
|
468 | extends BaseInterceptingCall
|
469 | implements InterceptingCallInterface {}
|
470 |
|
471 | function getBottomInterceptingCall(
|
472 | channel: Channel,
|
473 | options: InterceptorOptions,
|
474 |
|
475 | methodDefinition: ClientMethodDefinition<any, any>
|
476 | ) {
|
477 | const call = getCall(channel, methodDefinition.path, options);
|
478 | if (methodDefinition.responseStream) {
|
479 | return new BaseStreamingInterceptingCall(call, methodDefinition);
|
480 | } else {
|
481 | return new BaseUnaryInterceptingCall(call, methodDefinition);
|
482 | }
|
483 | }
|
484 |
|
485 | export interface NextCall {
|
486 | (options: InterceptorOptions): InterceptingCallInterface;
|
487 | }
|
488 |
|
489 | export interface Interceptor {
|
490 | (options: InterceptorOptions, nextCall: NextCall): InterceptingCall;
|
491 | }
|
492 |
|
493 | export interface InterceptorProvider {
|
494 |
|
495 | (methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
|
496 | }
|
497 |
|
498 | export interface InterceptorArguments {
|
499 | clientInterceptors: Interceptor[];
|
500 | clientInterceptorProviders: InterceptorProvider[];
|
501 | callInterceptors: Interceptor[];
|
502 | callInterceptorProviders: InterceptorProvider[];
|
503 | }
|
504 |
|
505 | export function getInterceptingCall(
|
506 | interceptorArgs: InterceptorArguments,
|
507 |
|
508 | methodDefinition: ClientMethodDefinition<any, any>,
|
509 | options: CallOptions,
|
510 | channel: Channel
|
511 | ): InterceptingCallInterface {
|
512 | if (
|
513 | interceptorArgs.clientInterceptors.length > 0 &&
|
514 | interceptorArgs.clientInterceptorProviders.length > 0
|
515 | ) {
|
516 | throw new InterceptorConfigurationError(
|
517 | 'Both interceptors and interceptor_providers were passed as options ' +
|
518 | 'to the client constructor. Only one of these is allowed.'
|
519 | );
|
520 | }
|
521 | if (
|
522 | interceptorArgs.callInterceptors.length > 0 &&
|
523 | interceptorArgs.callInterceptorProviders.length > 0
|
524 | ) {
|
525 | throw new InterceptorConfigurationError(
|
526 | 'Both interceptors and interceptor_providers were passed as call ' +
|
527 | 'options. Only one of these is allowed.'
|
528 | );
|
529 | }
|
530 | let interceptors: Interceptor[] = [];
|
531 |
|
532 | if (
|
533 | interceptorArgs.callInterceptors.length > 0 ||
|
534 | interceptorArgs.callInterceptorProviders.length > 0
|
535 | ) {
|
536 | interceptors = ([] as Interceptor[])
|
537 | .concat(
|
538 | interceptorArgs.callInterceptors,
|
539 | interceptorArgs.callInterceptorProviders.map((provider) =>
|
540 | provider(methodDefinition)
|
541 | )
|
542 | )
|
543 | .filter((interceptor) => interceptor);
|
544 |
|
545 | } else {
|
546 | interceptors = ([] as Interceptor[])
|
547 | .concat(
|
548 | interceptorArgs.clientInterceptors,
|
549 | interceptorArgs.clientInterceptorProviders.map((provider) =>
|
550 | provider(methodDefinition)
|
551 | )
|
552 | )
|
553 | .filter((interceptor) => interceptor);
|
554 |
|
555 | }
|
556 | const interceptorOptions = Object.assign({}, options, {
|
557 | method_definition: methodDefinition,
|
558 | });
|
559 | |
560 |
|
561 |
|
562 |
|
563 |
|
564 |
|
565 |
|
566 | const getCall: NextCall = interceptors.reduceRight<NextCall>(
|
567 | (nextCall: NextCall, nextInterceptor: Interceptor) => {
|
568 | return (currentOptions) => nextInterceptor(currentOptions, nextCall);
|
569 | },
|
570 | (finalOptions: InterceptorOptions) =>
|
571 | getBottomInterceptingCall(channel, finalOptions, methodDefinition)
|
572 | );
|
573 | return getCall(interceptorOptions);
|
574 | }
|