UNPKG

17.9 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { Metadata } from './metadata';
19import {
20 StatusObject,
21 Listener,
22 MetadataListener,
23 MessageListener,
24 StatusListener,
25 FullListener,
26 InterceptingListener,
27 InterceptingListenerImpl,
28 isInterceptingListener,
29 MessageContext,
30 Call,
31} from './call-interface';
32import { Status } from './constants';
33import { Channel } from './channel';
34import { CallOptions } from './client';
35import { CallCredentials } from './call-credentials';
36import { ClientMethodDefinition } from './make-client';
37import { getErrorMessage } from './error';
38
39/**
40 * Error class associated with passing both interceptors and interceptor
41 * providers to a client constructor or as call options.
42 */
43export class InterceptorConfigurationError extends Error {
44 constructor(message: string) {
45 super(message);
46 this.name = 'InterceptorConfigurationError';
47 Error.captureStackTrace(this, InterceptorConfigurationError);
48 }
49}
50
51export interface MetadataRequester {
52 (
53 metadata: Metadata,
54 listener: InterceptingListener,
55 next: (
56 metadata: Metadata,
57 listener: InterceptingListener | Listener
58 ) => void
59 ): void;
60}
61
62export interface MessageRequester {
63 // eslint-disable-next-line @typescript-eslint/no-explicit-any
64 (message: any, next: (message: any) => void): void;
65}
66
67export interface CloseRequester {
68 (next: () => void): void;
69}
70
71export interface CancelRequester {
72 (next: () => void): void;
73}
74
75/**
76 * An object with methods for intercepting and modifying outgoing call operations.
77 */
78export interface FullRequester {
79 start: MetadataRequester;
80 sendMessage: MessageRequester;
81 halfClose: CloseRequester;
82 cancel: CancelRequester;
83}
84
85export type Requester = Partial<FullRequester>;
86
87export class ListenerBuilder {
88 private metadata: MetadataListener | undefined = undefined;
89 private message: MessageListener | undefined = undefined;
90 private status: StatusListener | undefined = undefined;
91
92 withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this {
93 this.metadata = onReceiveMetadata;
94 return this;
95 }
96
97 withOnReceiveMessage(onReceiveMessage: MessageListener): this {
98 this.message = onReceiveMessage;
99 return this;
100 }
101
102 withOnReceiveStatus(onReceiveStatus: StatusListener): this {
103 this.status = onReceiveStatus;
104 return this;
105 }
106
107 build(): Listener {
108 return {
109 onReceiveMetadata: this.metadata,
110 onReceiveMessage: this.message,
111 onReceiveStatus: this.status,
112 };
113 }
114}
115
116export class RequesterBuilder {
117 private start: MetadataRequester | undefined = undefined;
118 private message: MessageRequester | undefined = undefined;
119 private halfClose: CloseRequester | undefined = undefined;
120 private cancel: CancelRequester | undefined = undefined;
121
122 withStart(start: MetadataRequester): this {
123 this.start = start;
124 return this;
125 }
126
127 withSendMessage(sendMessage: MessageRequester): this {
128 this.message = sendMessage;
129 return this;
130 }
131
132 withHalfClose(halfClose: CloseRequester): this {
133 this.halfClose = halfClose;
134 return this;
135 }
136
137 withCancel(cancel: CancelRequester): this {
138 this.cancel = cancel;
139 return this;
140 }
141
142 build(): Requester {
143 return {
144 start: this.start,
145 sendMessage: this.message,
146 halfClose: this.halfClose,
147 cancel: this.cancel,
148 };
149 }
150}
151
152/**
153 * A Listener with a default pass-through implementation of each method. Used
154 * for filling out Listeners with some methods omitted.
155 */
156const defaultListener: FullListener = {
157 onReceiveMetadata: (metadata, next) => {
158 next(metadata);
159 },
160 onReceiveMessage: (message, next) => {
161 next(message);
162 },
163 onReceiveStatus: (status, next) => {
164 next(status);
165 },
166};
167
168/**
169 * A Requester with a default pass-through implementation of each method. Used
170 * for filling out Requesters with some methods omitted.
171 */
172const defaultRequester: FullRequester = {
173 start: (metadata, listener, next) => {
174 next(metadata, listener);
175 },
176 sendMessage: (message, next) => {
177 next(message);
178 },
179 halfClose: (next) => {
180 next();
181 },
182 cancel: (next) => {
183 next();
184 },
185};
186
187export interface InterceptorOptions extends CallOptions {
188 // eslint-disable-next-line @typescript-eslint/no-explicit-any
189 method_definition: ClientMethodDefinition<any, any>;
190}
191
192export interface InterceptingCallInterface {
193 cancelWithStatus(status: Status, details: string): void;
194 getPeer(): string;
195 start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
196 // eslint-disable-next-line @typescript-eslint/no-explicit-any
197 sendMessageWithContext(context: MessageContext, message: any): void;
198 // eslint-disable-next-line @typescript-eslint/no-explicit-any
199 sendMessage(message: any): void;
200 startRead(): void;
201 halfClose(): void;
202}
203
204export class InterceptingCall implements InterceptingCallInterface {
205 /**
206 * The requester that this InterceptingCall uses to modify outgoing operations
207 */
208 private requester: FullRequester;
209 /**
210 * Indicates that metadata has been passed to the requester's start
211 * method but it has not been passed to the corresponding next callback
212 */
213 private processingMetadata = false;
214 /**
215 * Message context for a pending message that is waiting for
216 */
217 private pendingMessageContext: MessageContext | null = null;
218 private pendingMessage: any;
219 /**
220 * Indicates that a message has been passed to the requester's sendMessage
221 * method but it has not been passed to the corresponding next callback
222 */
223 private processingMessage = false;
224 /**
225 * Indicates that a status was received but could not be propagated because
226 * a message was still being processed.
227 */
228 private pendingHalfClose = false;
229 constructor(
230 private nextCall: InterceptingCallInterface,
231 requester?: Requester
232 ) {
233 if (requester) {
234 this.requester = {
235 start: requester.start ?? defaultRequester.start,
236 sendMessage: requester.sendMessage ?? defaultRequester.sendMessage,
237 halfClose: requester.halfClose ?? defaultRequester.halfClose,
238 cancel: requester.cancel ?? defaultRequester.cancel,
239 };
240 } else {
241 this.requester = defaultRequester;
242 }
243 }
244
245 cancelWithStatus(status: Status, details: string) {
246 this.requester.cancel(() => {
247 this.nextCall.cancelWithStatus(status, details);
248 });
249 }
250
251 getPeer() {
252 return this.nextCall.getPeer();
253 }
254
255 private processPendingMessage() {
256 if (this.pendingMessageContext) {
257 this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage);
258 this.pendingMessageContext = null;
259 this.pendingMessage = null;
260 }
261 }
262
263 private processPendingHalfClose() {
264 if (this.pendingHalfClose) {
265 this.nextCall.halfClose();
266 }
267 }
268
269 start(
270 metadata: Metadata,
271 interceptingListener?: Partial<InterceptingListener>
272 ): void {
273 const fullInterceptingListener: InterceptingListener = {
274 onReceiveMetadata:
275 interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
276 ((metadata) => {}),
277 onReceiveMessage:
278 interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
279 ((message) => {}),
280 onReceiveStatus:
281 interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
282 ((status) => {}),
283 };
284 this.processingMetadata = true;
285 this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
286 this.processingMetadata = false;
287 let finalInterceptingListener: InterceptingListener;
288 if (isInterceptingListener(listener)) {
289 finalInterceptingListener = listener;
290 } else {
291 const fullListener: FullListener = {
292 onReceiveMetadata:
293 listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
294 onReceiveMessage:
295 listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
296 onReceiveStatus:
297 listener.onReceiveStatus ?? defaultListener.onReceiveStatus,
298 };
299 finalInterceptingListener = new InterceptingListenerImpl(
300 fullListener,
301 fullInterceptingListener
302 );
303 }
304 this.nextCall.start(md, finalInterceptingListener);
305 this.processPendingMessage();
306 this.processPendingHalfClose();
307 });
308 }
309 // eslint-disable-next-line @typescript-eslint/no-explicit-any
310 sendMessageWithContext(context: MessageContext, message: any): void {
311 this.processingMessage = true;
312 this.requester.sendMessage(message, (finalMessage) => {
313 this.processingMessage = false;
314 if (this.processingMetadata) {
315 this.pendingMessageContext = context;
316 this.pendingMessage = message;
317 } else {
318 this.nextCall.sendMessageWithContext(context, finalMessage);
319 this.processPendingHalfClose();
320 }
321 });
322 }
323 // eslint-disable-next-line @typescript-eslint/no-explicit-any
324 sendMessage(message: any): void {
325 this.sendMessageWithContext({}, message);
326 }
327 startRead(): void {
328 this.nextCall.startRead();
329 }
330 halfClose(): void {
331 this.requester.halfClose(() => {
332 if (this.processingMetadata || this.processingMessage) {
333 this.pendingHalfClose = true;
334 } else {
335 this.nextCall.halfClose();
336 }
337 });
338 }
339}
340
341function getCall(channel: Channel, path: string, options: CallOptions): Call {
342 const deadline = options.deadline ?? Infinity;
343 const host = options.host;
344 const parent = options.parent ?? null;
345 const propagateFlags = options.propagate_flags;
346 const credentials = options.credentials;
347 const call = channel.createCall(path, deadline, host, parent, propagateFlags);
348 if (credentials) {
349 call.setCredentials(credentials);
350 }
351 return call;
352}
353
354/**
355 * InterceptingCall implementation that directly owns the underlying Call
356 * object and handles serialization and deseraizliation.
357 */
358class BaseInterceptingCall implements InterceptingCallInterface {
359 constructor(
360 protected call: Call,
361 // eslint-disable-next-line @typescript-eslint/no-explicit-any
362 protected methodDefinition: ClientMethodDefinition<any, any>
363 ) {}
364 cancelWithStatus(status: Status, details: string): void {
365 this.call.cancelWithStatus(status, details);
366 }
367 getPeer(): string {
368 return this.call.getPeer();
369 }
370 // eslint-disable-next-line @typescript-eslint/no-explicit-any
371 sendMessageWithContext(context: MessageContext, message: any): void {
372 let serialized: Buffer;
373 try {
374 serialized = this.methodDefinition.requestSerialize(message);
375 } catch (e) {
376 this.call.cancelWithStatus(
377 Status.INTERNAL,
378 `Request message serialization failure: ${getErrorMessage(e)}`
379 );
380 return;
381 }
382 this.call.sendMessageWithContext(context, serialized);
383 }
384 // eslint-disable-next-line @typescript-eslint/no-explicit-any
385 sendMessage(message: any) {
386 this.sendMessageWithContext({}, message);
387 }
388 start(
389 metadata: Metadata,
390 interceptingListener?: Partial<InterceptingListener>
391 ): void {
392 let readError: StatusObject | null = null;
393 this.call.start(metadata, {
394 onReceiveMetadata: (metadata) => {
395 interceptingListener?.onReceiveMetadata?.(metadata);
396 },
397 onReceiveMessage: (message) => {
398 // eslint-disable-next-line @typescript-eslint/no-explicit-any
399 let deserialized: any;
400 try {
401 deserialized = this.methodDefinition.responseDeserialize(message);
402 } catch (e) {
403 readError = {
404 code: Status.INTERNAL,
405 details: `Response message parsing error: ${getErrorMessage(e)}`,
406 metadata: new Metadata(),
407 };
408 this.call.cancelWithStatus(readError.code, readError.details);
409 return;
410 }
411 interceptingListener?.onReceiveMessage?.(deserialized);
412 },
413 onReceiveStatus: (status) => {
414 if (readError) {
415 interceptingListener?.onReceiveStatus?.(readError);
416 } else {
417 interceptingListener?.onReceiveStatus?.(status);
418 }
419 },
420 });
421 }
422 startRead() {
423 this.call.startRead();
424 }
425 halfClose(): void {
426 this.call.halfClose();
427 }
428}
429
430/**
431 * BaseInterceptingCall with special-cased behavior for methods with unary
432 * responses.
433 */
434class BaseUnaryInterceptingCall
435 extends BaseInterceptingCall
436 implements InterceptingCallInterface {
437 // eslint-disable-next-line @typescript-eslint/no-explicit-any
438 constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
439 super(call, methodDefinition);
440 }
441 start(metadata: Metadata, listener?: Partial<InterceptingListener>): void {
442 let receivedMessage = false;
443 const wrapperListener: InterceptingListener = {
444 onReceiveMetadata:
445 listener?.onReceiveMetadata?.bind(listener) ?? ((metadata) => {}),
446 // eslint-disable-next-line @typescript-eslint/no-explicit-any
447 onReceiveMessage: (message: any) => {
448 receivedMessage = true;
449 listener?.onReceiveMessage?.(message);
450 },
451 onReceiveStatus: (status: StatusObject) => {
452 if (!receivedMessage) {
453 listener?.onReceiveMessage?.(null);
454 }
455 listener?.onReceiveStatus?.(status);
456 },
457 };
458 super.start(metadata, wrapperListener);
459 this.call.startRead();
460 }
461}
462
463/**
464 * BaseInterceptingCall with special-cased behavior for methods with streaming
465 * responses.
466 */
467class BaseStreamingInterceptingCall
468 extends BaseInterceptingCall
469 implements InterceptingCallInterface {}
470
471function getBottomInterceptingCall(
472 channel: Channel,
473 options: InterceptorOptions,
474 // eslint-disable-next-line @typescript-eslint/no-explicit-any
475 methodDefinition: ClientMethodDefinition<any, any>
476) {
477 const call = getCall(channel, methodDefinition.path, options);
478 if (methodDefinition.responseStream) {
479 return new BaseStreamingInterceptingCall(call, methodDefinition);
480 } else {
481 return new BaseUnaryInterceptingCall(call, methodDefinition);
482 }
483}
484
485export interface NextCall {
486 (options: InterceptorOptions): InterceptingCallInterface;
487}
488
489export interface Interceptor {
490 (options: InterceptorOptions, nextCall: NextCall): InterceptingCall;
491}
492
493export interface InterceptorProvider {
494 // eslint-disable-next-line @typescript-eslint/no-explicit-any
495 (methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
496}
497
498export interface InterceptorArguments {
499 clientInterceptors: Interceptor[];
500 clientInterceptorProviders: InterceptorProvider[];
501 callInterceptors: Interceptor[];
502 callInterceptorProviders: InterceptorProvider[];
503}
504
505export function getInterceptingCall(
506 interceptorArgs: InterceptorArguments,
507 // eslint-disable-next-line @typescript-eslint/no-explicit-any
508 methodDefinition: ClientMethodDefinition<any, any>,
509 options: CallOptions,
510 channel: Channel
511): InterceptingCallInterface {
512 if (
513 interceptorArgs.clientInterceptors.length > 0 &&
514 interceptorArgs.clientInterceptorProviders.length > 0
515 ) {
516 throw new InterceptorConfigurationError(
517 'Both interceptors and interceptor_providers were passed as options ' +
518 'to the client constructor. Only one of these is allowed.'
519 );
520 }
521 if (
522 interceptorArgs.callInterceptors.length > 0 &&
523 interceptorArgs.callInterceptorProviders.length > 0
524 ) {
525 throw new InterceptorConfigurationError(
526 'Both interceptors and interceptor_providers were passed as call ' +
527 'options. Only one of these is allowed.'
528 );
529 }
530 let interceptors: Interceptor[] = [];
531 // Interceptors passed to the call override interceptors passed to the client constructor
532 if (
533 interceptorArgs.callInterceptors.length > 0 ||
534 interceptorArgs.callInterceptorProviders.length > 0
535 ) {
536 interceptors = ([] as Interceptor[])
537 .concat(
538 interceptorArgs.callInterceptors,
539 interceptorArgs.callInterceptorProviders.map((provider) =>
540 provider(methodDefinition)
541 )
542 )
543 .filter((interceptor) => interceptor);
544 // Filter out falsy values when providers return nothing
545 } else {
546 interceptors = ([] as Interceptor[])
547 .concat(
548 interceptorArgs.clientInterceptors,
549 interceptorArgs.clientInterceptorProviders.map((provider) =>
550 provider(methodDefinition)
551 )
552 )
553 .filter((interceptor) => interceptor);
554 // Filter out falsy values when providers return nothing
555 }
556 const interceptorOptions = Object.assign({}, options, {
557 method_definition: methodDefinition,
558 });
559 /* For each interceptor in the list, the nextCall function passed to it is
560 * based on the next interceptor in the list, using a nextCall function
561 * constructed with the following interceptor in the list, and so on. The
562 * initialValue, which is effectively at the end of the list, is a nextCall
563 * function that invokes getBottomInterceptingCall, the result of which
564 * handles (de)serialization and also gets the underlying call from the
565 * channel. */
566 const getCall: NextCall = interceptors.reduceRight<NextCall>(
567 (nextCall: NextCall, nextInterceptor: Interceptor) => {
568 return (currentOptions) => nextInterceptor(currentOptions, nextCall);
569 },
570 (finalOptions: InterceptorOptions) =>
571 getBottomInterceptingCall(channel, finalOptions, methodDefinition)
572 );
573 return getCall(interceptorOptions);
574}