UNPKG

18.1 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { Metadata } from './metadata';
19import {
20 StatusObject,
21 Listener,
22 MetadataListener,
23 MessageListener,
24 StatusListener,
25 FullListener,
26 InterceptingListener,
27 InterceptingListenerImpl,
28 isInterceptingListener,
29 MessageContext,
30 Call,
31} from './call-stream';
32import { Status } from './constants';
33import { Channel } from './channel';
34import { CallOptions } from './client';
35import { CallCredentials } from './call-credentials';
36import { ClientMethodDefinition } from './make-client';
37
38/**
39 * Error class associated with passing both interceptors and interceptor
40 * providers to a client constructor or as call options.
41 */
42export class InterceptorConfigurationError extends Error {
43 constructor(message: string) {
44 super(message);
45 this.name = 'InterceptorConfigurationError';
46 Error.captureStackTrace(this, InterceptorConfigurationError);
47 }
48}
49
50export interface MetadataRequester {
51 (
52 metadata: Metadata,
53 listener: InterceptingListener,
54 next: (
55 metadata: Metadata,
56 listener: InterceptingListener | Listener
57 ) => void
58 ): void;
59}
60
61export interface MessageRequester {
62 // eslint-disable-next-line @typescript-eslint/no-explicit-any
63 (message: any, next: (message: any) => void): void;
64}
65
66export interface CloseRequester {
67 (next: () => void): void;
68}
69
70export interface CancelRequester {
71 (next: () => void): void;
72}
73
74/**
75 * An object with methods for intercepting and modifying outgoing call operations.
76 */
77export interface FullRequester {
78 start: MetadataRequester;
79 sendMessage: MessageRequester;
80 halfClose: CloseRequester;
81 cancel: CancelRequester;
82}
83
84export type Requester = Partial<FullRequester>;
85
86export class ListenerBuilder {
87 private metadata: MetadataListener | undefined = undefined;
88 private message: MessageListener | undefined = undefined;
89 private status: StatusListener | undefined = undefined;
90
91 withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this {
92 this.metadata = onReceiveMetadata;
93 return this;
94 }
95
96 withOnReceiveMessage(onReceiveMessage: MessageListener): this {
97 this.message = onReceiveMessage;
98 return this;
99 }
100
101 withOnReceiveStatus(onReceiveStatus: StatusListener): this {
102 this.status = onReceiveStatus;
103 return this;
104 }
105
106 build(): Listener {
107 return {
108 onReceiveMetadata: this.metadata,
109 onReceiveMessage: this.message,
110 onReceiveStatus: this.status,
111 };
112 }
113}
114
115export class RequesterBuilder {
116 private start: MetadataRequester | undefined = undefined;
117 private message: MessageRequester | undefined = undefined;
118 private halfClose: CloseRequester | undefined = undefined;
119 private cancel: CancelRequester | undefined = undefined;
120
121 withStart(start: MetadataRequester): this {
122 this.start = start;
123 return this;
124 }
125
126 withSendMessage(sendMessage: MessageRequester): this {
127 this.message = sendMessage;
128 return this;
129 }
130
131 withHalfClose(halfClose: CloseRequester): this {
132 this.halfClose = halfClose;
133 return this;
134 }
135
136 withCancel(cancel: CancelRequester): this {
137 this.cancel = cancel;
138 return this;
139 }
140
141 build(): Requester {
142 return {
143 start: this.start,
144 sendMessage: this.message,
145 halfClose: this.halfClose,
146 cancel: this.cancel,
147 };
148 }
149}
150
151/**
152 * A Listener with a default pass-through implementation of each method. Used
153 * for filling out Listeners with some methods omitted.
154 */
155const defaultListener: FullListener = {
156 onReceiveMetadata: (metadata, next) => {
157 next(metadata);
158 },
159 onReceiveMessage: (message, next) => {
160 next(message);
161 },
162 onReceiveStatus: (status, next) => {
163 next(status);
164 },
165};
166
167/**
168 * A Requester with a default pass-through implementation of each method. Used
169 * for filling out Requesters with some methods omitted.
170 */
171const defaultRequester: FullRequester = {
172 start: (metadata, listener, next) => {
173 next(metadata, listener);
174 },
175 sendMessage: (message, next) => {
176 next(message);
177 },
178 halfClose: (next) => {
179 next();
180 },
181 cancel: (next) => {
182 next();
183 },
184};
185
186export interface InterceptorOptions extends CallOptions {
187 // eslint-disable-next-line @typescript-eslint/no-explicit-any
188 method_definition: ClientMethodDefinition<any, any>;
189}
190
191export interface InterceptingCallInterface {
192 cancelWithStatus(status: Status, details: string): void;
193 getPeer(): string;
194 start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
195 // eslint-disable-next-line @typescript-eslint/no-explicit-any
196 sendMessageWithContext(context: MessageContext, message: any): void;
197 // eslint-disable-next-line @typescript-eslint/no-explicit-any
198 sendMessage(message: any): void;
199 startRead(): void;
200 halfClose(): void;
201
202 setCredentials(credentials: CallCredentials): void;
203}
204
205export class InterceptingCall implements InterceptingCallInterface {
206 /**
207 * The requester that this InterceptingCall uses to modify outgoing operations
208 */
209 private requester: FullRequester;
210 /**
211 * Indicates that metadata has been passed to the requester's start
212 * method but it has not been passed to the corresponding next callback
213 */
214 private processingMetadata = false;
215 /**
216 * Message context for a pending message that is waiting for
217 */
218 private pendingMessageContext: MessageContext | null = null;
219 private pendingMessage: any;
220 /**
221 * Indicates that a message has been passed to the requester's sendMessage
222 * method but it has not been passed to the corresponding next callback
223 */
224 private processingMessage = false;
225 /**
226 * Indicates that a status was received but could not be propagated because
227 * a message was still being processed.
228 */
229 private pendingHalfClose = false;
230 constructor(
231 private nextCall: InterceptingCallInterface,
232 requester?: Requester
233 ) {
234 if (requester) {
235 this.requester = {
236 start: requester.start ?? defaultRequester.start,
237 sendMessage: requester.sendMessage ?? defaultRequester.sendMessage,
238 halfClose: requester.halfClose ?? defaultRequester.halfClose,
239 cancel: requester.cancel ?? defaultRequester.cancel,
240 };
241 } else {
242 this.requester = defaultRequester;
243 }
244 }
245
246 cancelWithStatus(status: Status, details: string) {
247 this.requester.cancel(() => {
248 this.nextCall.cancelWithStatus(status, details);
249 });
250 }
251
252 getPeer() {
253 return this.nextCall.getPeer();
254 }
255
256 private processPendingMessage() {
257 if (this.pendingMessageContext) {
258 this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage);
259 this.pendingMessageContext = null;
260 this.pendingMessage = null;
261 }
262 }
263
264 private processPendingHalfClose() {
265 if (this.pendingHalfClose) {
266 this.nextCall.halfClose();
267 }
268 }
269
270 start(
271 metadata: Metadata,
272 interceptingListener?: Partial<InterceptingListener>
273 ): void {
274 const fullInterceptingListener: InterceptingListener = {
275 onReceiveMetadata:
276 interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
277 ((metadata) => {}),
278 onReceiveMessage:
279 interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
280 ((message) => {}),
281 onReceiveStatus:
282 interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
283 ((status) => {}),
284 };
285 this.processingMetadata = true;
286 this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
287 this.processingMetadata = false;
288 let finalInterceptingListener: InterceptingListener;
289 if (isInterceptingListener(listener)) {
290 finalInterceptingListener = listener;
291 } else {
292 const fullListener: FullListener = {
293 onReceiveMetadata:
294 listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
295 onReceiveMessage:
296 listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
297 onReceiveStatus:
298 listener.onReceiveStatus ?? defaultListener.onReceiveStatus,
299 };
300 finalInterceptingListener = new InterceptingListenerImpl(
301 fullListener,
302 fullInterceptingListener
303 );
304 }
305 this.nextCall.start(md, finalInterceptingListener);
306 this.processPendingMessage();
307 this.processPendingHalfClose();
308 });
309 }
310 // eslint-disable-next-line @typescript-eslint/no-explicit-any
311 sendMessageWithContext(context: MessageContext, message: any): void {
312 this.processingMessage = true;
313 this.requester.sendMessage(message, (finalMessage) => {
314 this.processingMessage = false;
315 if (this.processingMetadata) {
316 this.pendingMessageContext = context;
317 this.pendingMessage = message;
318 } else {
319 this.nextCall.sendMessageWithContext(context, finalMessage);
320 this.processPendingHalfClose();
321 }
322 });
323 }
324 // eslint-disable-next-line @typescript-eslint/no-explicit-any
325 sendMessage(message: any): void {
326 this.sendMessageWithContext({}, message);
327 }
328 startRead(): void {
329 this.nextCall.startRead();
330 }
331 halfClose(): void {
332 this.requester.halfClose(() => {
333 if (this.processingMetadata || this.processingMessage) {
334 this.pendingHalfClose = true;
335 } else {
336 this.nextCall.halfClose();
337 }
338 });
339 }
340 setCredentials(credentials: CallCredentials): void {
341 this.nextCall.setCredentials(credentials);
342 }
343}
344
345function getCall(channel: Channel, path: string, options: CallOptions): Call {
346 const deadline = options.deadline ?? Infinity;
347 const host = options.host;
348 const parent = options.parent ?? null;
349 const propagateFlags = options.propagate_flags;
350 const credentials = options.credentials;
351 const call = channel.createCall(path, deadline, host, parent, propagateFlags);
352 if (credentials) {
353 call.setCredentials(credentials);
354 }
355 return call;
356}
357
358/**
359 * InterceptingCall implementation that directly owns the underlying Call
360 * object and handles serialization and deseraizliation.
361 */
362class BaseInterceptingCall implements InterceptingCallInterface {
363 constructor(
364 protected call: Call,
365 // eslint-disable-next-line @typescript-eslint/no-explicit-any
366 protected methodDefinition: ClientMethodDefinition<any, any>
367 ) {}
368 cancelWithStatus(status: Status, details: string): void {
369 this.call.cancelWithStatus(status, details);
370 }
371 getPeer(): string {
372 return this.call.getPeer();
373 }
374 setCredentials(credentials: CallCredentials): void {
375 this.call.setCredentials(credentials);
376 }
377 // eslint-disable-next-line @typescript-eslint/no-explicit-any
378 sendMessageWithContext(context: MessageContext, message: any): void {
379 let serialized: Buffer;
380 try {
381 serialized = this.methodDefinition.requestSerialize(message);
382 } catch (e) {
383 this.call.cancelWithStatus(
384 Status.INTERNAL,
385 `Request message serialization failure: ${e.message}`
386 );
387 return;
388 }
389 this.call.sendMessageWithContext(context, serialized);
390 }
391 // eslint-disable-next-line @typescript-eslint/no-explicit-any
392 sendMessage(message: any) {
393 this.sendMessageWithContext({}, message);
394 }
395 start(
396 metadata: Metadata,
397 interceptingListener?: Partial<InterceptingListener>
398 ): void {
399 let readError: StatusObject | null = null;
400 this.call.start(metadata, {
401 onReceiveMetadata: (metadata) => {
402 interceptingListener?.onReceiveMetadata?.(metadata);
403 },
404 onReceiveMessage: (message) => {
405 // eslint-disable-next-line @typescript-eslint/no-explicit-any
406 let deserialized: any;
407 try {
408 deserialized = this.methodDefinition.responseDeserialize(message);
409 } catch (e) {
410 readError = {
411 code: Status.INTERNAL,
412 details: `Response message parsing error: ${e.message}`,
413 metadata: new Metadata(),
414 };
415 this.call.cancelWithStatus(readError.code, readError.details);
416 return;
417 }
418 interceptingListener?.onReceiveMessage?.(deserialized);
419 },
420 onReceiveStatus: (status) => {
421 if (readError) {
422 interceptingListener?.onReceiveStatus?.(readError);
423 } else {
424 interceptingListener?.onReceiveStatus?.(status);
425 }
426 },
427 });
428 }
429 startRead() {
430 this.call.startRead();
431 }
432 halfClose(): void {
433 this.call.halfClose();
434 }
435}
436
437/**
438 * BaseInterceptingCall with special-cased behavior for methods with unary
439 * responses.
440 */
441class BaseUnaryInterceptingCall
442 extends BaseInterceptingCall
443 implements InterceptingCallInterface {
444 // eslint-disable-next-line @typescript-eslint/no-explicit-any
445 constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
446 super(call, methodDefinition);
447 }
448 start(metadata: Metadata, listener?: Partial<InterceptingListener>): void {
449 let receivedMessage = false;
450 const wrapperListener: InterceptingListener = {
451 onReceiveMetadata:
452 listener?.onReceiveMetadata?.bind(listener) ?? ((metadata) => {}),
453 // eslint-disable-next-line @typescript-eslint/no-explicit-any
454 onReceiveMessage: (message: any) => {
455 receivedMessage = true;
456 listener?.onReceiveMessage?.(message);
457 },
458 onReceiveStatus: (status: StatusObject) => {
459 if (!receivedMessage) {
460 listener?.onReceiveMessage?.(null);
461 }
462 listener?.onReceiveStatus?.(status);
463 },
464 };
465 super.start(metadata, wrapperListener);
466 this.call.startRead();
467 }
468}
469
470/**
471 * BaseInterceptingCall with special-cased behavior for methods with streaming
472 * responses.
473 */
474class BaseStreamingInterceptingCall
475 extends BaseInterceptingCall
476 implements InterceptingCallInterface {}
477
478function getBottomInterceptingCall(
479 channel: Channel,
480 options: InterceptorOptions,
481 // eslint-disable-next-line @typescript-eslint/no-explicit-any
482 methodDefinition: ClientMethodDefinition<any, any>
483) {
484 const call = getCall(channel, methodDefinition.path, options);
485 if (methodDefinition.responseStream) {
486 return new BaseStreamingInterceptingCall(call, methodDefinition);
487 } else {
488 return new BaseUnaryInterceptingCall(call, methodDefinition);
489 }
490}
491
492export interface NextCall {
493 (options: InterceptorOptions): InterceptingCallInterface;
494}
495
496export interface Interceptor {
497 (options: InterceptorOptions, nextCall: NextCall): InterceptingCall;
498}
499
500export interface InterceptorProvider {
501 // eslint-disable-next-line @typescript-eslint/no-explicit-any
502 (methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
503}
504
505export interface InterceptorArguments {
506 clientInterceptors: Interceptor[];
507 clientInterceptorProviders: InterceptorProvider[];
508 callInterceptors: Interceptor[];
509 callInterceptorProviders: InterceptorProvider[];
510}
511
512export function getInterceptingCall(
513 interceptorArgs: InterceptorArguments,
514 // eslint-disable-next-line @typescript-eslint/no-explicit-any
515 methodDefinition: ClientMethodDefinition<any, any>,
516 options: CallOptions,
517 channel: Channel
518): InterceptingCallInterface {
519 if (
520 interceptorArgs.clientInterceptors.length > 0 &&
521 interceptorArgs.clientInterceptorProviders.length > 0
522 ) {
523 throw new InterceptorConfigurationError(
524 'Both interceptors and interceptor_providers were passed as options ' +
525 'to the client constructor. Only one of these is allowed.'
526 );
527 }
528 if (
529 interceptorArgs.callInterceptors.length > 0 &&
530 interceptorArgs.callInterceptorProviders.length > 0
531 ) {
532 throw new InterceptorConfigurationError(
533 'Both interceptors and interceptor_providers were passed as call ' +
534 'options. Only one of these is allowed.'
535 );
536 }
537 let interceptors: Interceptor[] = [];
538 // Interceptors passed to the call override interceptors passed to the client constructor
539 if (
540 interceptorArgs.callInterceptors.length > 0 ||
541 interceptorArgs.callInterceptorProviders.length > 0
542 ) {
543 interceptors = ([] as Interceptor[])
544 .concat(
545 interceptorArgs.callInterceptors,
546 interceptorArgs.callInterceptorProviders.map((provider) =>
547 provider(methodDefinition)
548 )
549 )
550 .filter((interceptor) => interceptor);
551 // Filter out falsy values when providers return nothing
552 } else {
553 interceptors = ([] as Interceptor[])
554 .concat(
555 interceptorArgs.clientInterceptors,
556 interceptorArgs.clientInterceptorProviders.map((provider) =>
557 provider(methodDefinition)
558 )
559 )
560 .filter((interceptor) => interceptor);
561 // Filter out falsy values when providers return nothing
562 }
563 const interceptorOptions = Object.assign({}, options, {
564 method_definition: methodDefinition,
565 });
566 /* For each interceptor in the list, the nextCall function passed to it is
567 * based on the next interceptor in the list, using a nextCall function
568 * constructed with the following interceptor in the list, and so on. The
569 * initialValue, which is effectively at the end of the list, is a nextCall
570 * function that invokes getBottomInterceptingCall, the result of which
571 * handles (de)serialization and also gets the underlying call from the
572 * channel. */
573 const getCall: NextCall = interceptors.reduceRight<NextCall>(
574 (nextCall: NextCall, nextInterceptor: Interceptor) => {
575 return (currentOptions) => nextInterceptor(currentOptions, nextCall);
576 },
577 (finalOptions: InterceptorOptions) =>
578 getBottomInterceptingCall(channel, finalOptions, methodDefinition)
579 );
580 return getCall(interceptorOptions);
581}