UNPKG

17.8 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { Metadata } from './metadata';
19import {
20 StatusObject,
21 Listener,
22 MetadataListener,
23 MessageListener,
24 StatusListener,
25 FullListener,
26 InterceptingListener,
27 InterceptingListenerImpl,
28 isInterceptingListener,
29 MessageContext,
30 Call,
31} from './call-interface';
32import { Status } from './constants';
33import { Channel } from './channel';
34import { CallOptions } from './client';
35import { ClientMethodDefinition } from './make-client';
36import { getErrorMessage } from './error';
37
38/**
39 * Error class associated with passing both interceptors and interceptor
40 * providers to a client constructor or as call options.
41 */
42export class InterceptorConfigurationError extends Error {
43 constructor(message: string) {
44 super(message);
45 this.name = 'InterceptorConfigurationError';
46 Error.captureStackTrace(this, InterceptorConfigurationError);
47 }
48}
49
50export interface MetadataRequester {
51 (
52 metadata: Metadata,
53 listener: InterceptingListener,
54 next: (
55 metadata: Metadata,
56 listener: InterceptingListener | Listener
57 ) => void
58 ): void;
59}
60
61export interface MessageRequester {
62 // eslint-disable-next-line @typescript-eslint/no-explicit-any
63 (message: any, next: (message: any) => void): void;
64}
65
66export interface CloseRequester {
67 (next: () => void): void;
68}
69
70export interface CancelRequester {
71 (next: () => void): void;
72}
73
74/**
75 * An object with methods for intercepting and modifying outgoing call operations.
76 */
77export interface FullRequester {
78 start: MetadataRequester;
79 sendMessage: MessageRequester;
80 halfClose: CloseRequester;
81 cancel: CancelRequester;
82}
83
84export type Requester = Partial<FullRequester>;
85
86export class ListenerBuilder {
87 private metadata: MetadataListener | undefined = undefined;
88 private message: MessageListener | undefined = undefined;
89 private status: StatusListener | undefined = undefined;
90
91 withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this {
92 this.metadata = onReceiveMetadata;
93 return this;
94 }
95
96 withOnReceiveMessage(onReceiveMessage: MessageListener): this {
97 this.message = onReceiveMessage;
98 return this;
99 }
100
101 withOnReceiveStatus(onReceiveStatus: StatusListener): this {
102 this.status = onReceiveStatus;
103 return this;
104 }
105
106 build(): Listener {
107 return {
108 onReceiveMetadata: this.metadata,
109 onReceiveMessage: this.message,
110 onReceiveStatus: this.status,
111 };
112 }
113}
114
115export class RequesterBuilder {
116 private start: MetadataRequester | undefined = undefined;
117 private message: MessageRequester | undefined = undefined;
118 private halfClose: CloseRequester | undefined = undefined;
119 private cancel: CancelRequester | undefined = undefined;
120
121 withStart(start: MetadataRequester): this {
122 this.start = start;
123 return this;
124 }
125
126 withSendMessage(sendMessage: MessageRequester): this {
127 this.message = sendMessage;
128 return this;
129 }
130
131 withHalfClose(halfClose: CloseRequester): this {
132 this.halfClose = halfClose;
133 return this;
134 }
135
136 withCancel(cancel: CancelRequester): this {
137 this.cancel = cancel;
138 return this;
139 }
140
141 build(): Requester {
142 return {
143 start: this.start,
144 sendMessage: this.message,
145 halfClose: this.halfClose,
146 cancel: this.cancel,
147 };
148 }
149}
150
151/**
152 * A Listener with a default pass-through implementation of each method. Used
153 * for filling out Listeners with some methods omitted.
154 */
155const defaultListener: FullListener = {
156 onReceiveMetadata: (metadata, next) => {
157 next(metadata);
158 },
159 onReceiveMessage: (message, next) => {
160 next(message);
161 },
162 onReceiveStatus: (status, next) => {
163 next(status);
164 },
165};
166
167/**
168 * A Requester with a default pass-through implementation of each method. Used
169 * for filling out Requesters with some methods omitted.
170 */
171const defaultRequester: FullRequester = {
172 start: (metadata, listener, next) => {
173 next(metadata, listener);
174 },
175 sendMessage: (message, next) => {
176 next(message);
177 },
178 halfClose: next => {
179 next();
180 },
181 cancel: next => {
182 next();
183 },
184};
185
186export interface InterceptorOptions extends CallOptions {
187 // eslint-disable-next-line @typescript-eslint/no-explicit-any
188 method_definition: ClientMethodDefinition<any, any>;
189}
190
191export interface InterceptingCallInterface {
192 cancelWithStatus(status: Status, details: string): void;
193 getPeer(): string;
194 start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
195 // eslint-disable-next-line @typescript-eslint/no-explicit-any
196 sendMessageWithContext(context: MessageContext, message: any): void;
197 // eslint-disable-next-line @typescript-eslint/no-explicit-any
198 sendMessage(message: any): void;
199 startRead(): void;
200 halfClose(): void;
201}
202
203export class InterceptingCall implements InterceptingCallInterface {
204 /**
205 * The requester that this InterceptingCall uses to modify outgoing operations
206 */
207 private requester: FullRequester;
208 /**
209 * Indicates that metadata has been passed to the requester's start
210 * method but it has not been passed to the corresponding next callback
211 */
212 private processingMetadata = false;
213 /**
214 * Message context for a pending message that is waiting for
215 */
216 private pendingMessageContext: MessageContext | null = null;
217 private pendingMessage: any;
218 /**
219 * Indicates that a message has been passed to the requester's sendMessage
220 * method but it has not been passed to the corresponding next callback
221 */
222 private processingMessage = false;
223 /**
224 * Indicates that a status was received but could not be propagated because
225 * a message was still being processed.
226 */
227 private pendingHalfClose = false;
228 constructor(
229 private nextCall: InterceptingCallInterface,
230 requester?: Requester
231 ) {
232 if (requester) {
233 this.requester = {
234 start: requester.start ?? defaultRequester.start,
235 sendMessage: requester.sendMessage ?? defaultRequester.sendMessage,
236 halfClose: requester.halfClose ?? defaultRequester.halfClose,
237 cancel: requester.cancel ?? defaultRequester.cancel,
238 };
239 } else {
240 this.requester = defaultRequester;
241 }
242 }
243
244 cancelWithStatus(status: Status, details: string) {
245 this.requester.cancel(() => {
246 this.nextCall.cancelWithStatus(status, details);
247 });
248 }
249
250 getPeer() {
251 return this.nextCall.getPeer();
252 }
253
254 private processPendingMessage() {
255 if (this.pendingMessageContext) {
256 this.nextCall.sendMessageWithContext(
257 this.pendingMessageContext,
258 this.pendingMessage
259 );
260 this.pendingMessageContext = null;
261 this.pendingMessage = null;
262 }
263 }
264
265 private processPendingHalfClose() {
266 if (this.pendingHalfClose) {
267 this.nextCall.halfClose();
268 }
269 }
270
271 start(
272 metadata: Metadata,
273 interceptingListener?: Partial<InterceptingListener>
274 ): void {
275 const fullInterceptingListener: InterceptingListener = {
276 onReceiveMetadata:
277 interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
278 (metadata => {}),
279 onReceiveMessage:
280 interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
281 (message => {}),
282 onReceiveStatus:
283 interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
284 (status => {}),
285 };
286 this.processingMetadata = true;
287 this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
288 this.processingMetadata = false;
289 let finalInterceptingListener: InterceptingListener;
290 if (isInterceptingListener(listener)) {
291 finalInterceptingListener = listener;
292 } else {
293 const fullListener: FullListener = {
294 onReceiveMetadata:
295 listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
296 onReceiveMessage:
297 listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
298 onReceiveStatus:
299 listener.onReceiveStatus ?? defaultListener.onReceiveStatus,
300 };
301 finalInterceptingListener = new InterceptingListenerImpl(
302 fullListener,
303 fullInterceptingListener
304 );
305 }
306 this.nextCall.start(md, finalInterceptingListener);
307 this.processPendingMessage();
308 this.processPendingHalfClose();
309 });
310 }
311 // eslint-disable-next-line @typescript-eslint/no-explicit-any
312 sendMessageWithContext(context: MessageContext, message: any): void {
313 this.processingMessage = true;
314 this.requester.sendMessage(message, finalMessage => {
315 this.processingMessage = false;
316 if (this.processingMetadata) {
317 this.pendingMessageContext = context;
318 this.pendingMessage = message;
319 } else {
320 this.nextCall.sendMessageWithContext(context, finalMessage);
321 this.processPendingHalfClose();
322 }
323 });
324 }
325 // eslint-disable-next-line @typescript-eslint/no-explicit-any
326 sendMessage(message: any): void {
327 this.sendMessageWithContext({}, message);
328 }
329 startRead(): void {
330 this.nextCall.startRead();
331 }
332 halfClose(): void {
333 this.requester.halfClose(() => {
334 if (this.processingMetadata || this.processingMessage) {
335 this.pendingHalfClose = true;
336 } else {
337 this.nextCall.halfClose();
338 }
339 });
340 }
341}
342
343function getCall(channel: Channel, path: string, options: CallOptions): Call {
344 const deadline = options.deadline ?? Infinity;
345 const host = options.host;
346 const parent = options.parent ?? null;
347 const propagateFlags = options.propagate_flags;
348 const credentials = options.credentials;
349 const call = channel.createCall(path, deadline, host, parent, propagateFlags);
350 if (credentials) {
351 call.setCredentials(credentials);
352 }
353 return call;
354}
355
356/**
357 * InterceptingCall implementation that directly owns the underlying Call
358 * object and handles serialization and deseraizliation.
359 */
360class BaseInterceptingCall implements InterceptingCallInterface {
361 constructor(
362 protected call: Call,
363 // eslint-disable-next-line @typescript-eslint/no-explicit-any
364 protected methodDefinition: ClientMethodDefinition<any, any>
365 ) {}
366 cancelWithStatus(status: Status, details: string): void {
367 this.call.cancelWithStatus(status, details);
368 }
369 getPeer(): string {
370 return this.call.getPeer();
371 }
372 // eslint-disable-next-line @typescript-eslint/no-explicit-any
373 sendMessageWithContext(context: MessageContext, message: any): void {
374 let serialized: Buffer;
375 try {
376 serialized = this.methodDefinition.requestSerialize(message);
377 } catch (e) {
378 this.call.cancelWithStatus(
379 Status.INTERNAL,
380 `Request message serialization failure: ${getErrorMessage(e)}`
381 );
382 return;
383 }
384 this.call.sendMessageWithContext(context, serialized);
385 }
386 // eslint-disable-next-line @typescript-eslint/no-explicit-any
387 sendMessage(message: any) {
388 this.sendMessageWithContext({}, message);
389 }
390 start(
391 metadata: Metadata,
392 interceptingListener?: Partial<InterceptingListener>
393 ): void {
394 let readError: StatusObject | null = null;
395 this.call.start(metadata, {
396 onReceiveMetadata: metadata => {
397 interceptingListener?.onReceiveMetadata?.(metadata);
398 },
399 onReceiveMessage: message => {
400 // eslint-disable-next-line @typescript-eslint/no-explicit-any
401 let deserialized: any;
402 try {
403 deserialized = this.methodDefinition.responseDeserialize(message);
404 } catch (e) {
405 readError = {
406 code: Status.INTERNAL,
407 details: `Response message parsing error: ${getErrorMessage(e)}`,
408 metadata: new Metadata(),
409 };
410 this.call.cancelWithStatus(readError.code, readError.details);
411 return;
412 }
413 interceptingListener?.onReceiveMessage?.(deserialized);
414 },
415 onReceiveStatus: status => {
416 if (readError) {
417 interceptingListener?.onReceiveStatus?.(readError);
418 } else {
419 interceptingListener?.onReceiveStatus?.(status);
420 }
421 },
422 });
423 }
424 startRead() {
425 this.call.startRead();
426 }
427 halfClose(): void {
428 this.call.halfClose();
429 }
430}
431
432/**
433 * BaseInterceptingCall with special-cased behavior for methods with unary
434 * responses.
435 */
436class BaseUnaryInterceptingCall
437 extends BaseInterceptingCall
438 implements InterceptingCallInterface
439{
440 // eslint-disable-next-line @typescript-eslint/no-explicit-any
441 constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
442 super(call, methodDefinition);
443 }
444 start(metadata: Metadata, listener?: Partial<InterceptingListener>): void {
445 let receivedMessage = false;
446 const wrapperListener: InterceptingListener = {
447 onReceiveMetadata:
448 listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}),
449 // eslint-disable-next-line @typescript-eslint/no-explicit-any
450 onReceiveMessage: (message: any) => {
451 receivedMessage = true;
452 listener?.onReceiveMessage?.(message);
453 },
454 onReceiveStatus: (status: StatusObject) => {
455 if (!receivedMessage) {
456 listener?.onReceiveMessage?.(null);
457 }
458 listener?.onReceiveStatus?.(status);
459 },
460 };
461 super.start(metadata, wrapperListener);
462 this.call.startRead();
463 }
464}
465
466/**
467 * BaseInterceptingCall with special-cased behavior for methods with streaming
468 * responses.
469 */
470class BaseStreamingInterceptingCall
471 extends BaseInterceptingCall
472 implements InterceptingCallInterface {}
473
474function getBottomInterceptingCall(
475 channel: Channel,
476 options: InterceptorOptions,
477 // eslint-disable-next-line @typescript-eslint/no-explicit-any
478 methodDefinition: ClientMethodDefinition<any, any>
479) {
480 const call = getCall(channel, methodDefinition.path, options);
481 if (methodDefinition.responseStream) {
482 return new BaseStreamingInterceptingCall(call, methodDefinition);
483 } else {
484 return new BaseUnaryInterceptingCall(call, methodDefinition);
485 }
486}
487
488export interface NextCall {
489 (options: InterceptorOptions): InterceptingCallInterface;
490}
491
492export interface Interceptor {
493 (options: InterceptorOptions, nextCall: NextCall): InterceptingCall;
494}
495
496export interface InterceptorProvider {
497 // eslint-disable-next-line @typescript-eslint/no-explicit-any
498 (methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
499}
500
501export interface InterceptorArguments {
502 clientInterceptors: Interceptor[];
503 clientInterceptorProviders: InterceptorProvider[];
504 callInterceptors: Interceptor[];
505 callInterceptorProviders: InterceptorProvider[];
506}
507
508export function getInterceptingCall(
509 interceptorArgs: InterceptorArguments,
510 // eslint-disable-next-line @typescript-eslint/no-explicit-any
511 methodDefinition: ClientMethodDefinition<any, any>,
512 options: CallOptions,
513 channel: Channel
514): InterceptingCallInterface {
515 if (
516 interceptorArgs.clientInterceptors.length > 0 &&
517 interceptorArgs.clientInterceptorProviders.length > 0
518 ) {
519 throw new InterceptorConfigurationError(
520 'Both interceptors and interceptor_providers were passed as options ' +
521 'to the client constructor. Only one of these is allowed.'
522 );
523 }
524 if (
525 interceptorArgs.callInterceptors.length > 0 &&
526 interceptorArgs.callInterceptorProviders.length > 0
527 ) {
528 throw new InterceptorConfigurationError(
529 'Both interceptors and interceptor_providers were passed as call ' +
530 'options. Only one of these is allowed.'
531 );
532 }
533 let interceptors: Interceptor[] = [];
534 // Interceptors passed to the call override interceptors passed to the client constructor
535 if (
536 interceptorArgs.callInterceptors.length > 0 ||
537 interceptorArgs.callInterceptorProviders.length > 0
538 ) {
539 interceptors = ([] as Interceptor[])
540 .concat(
541 interceptorArgs.callInterceptors,
542 interceptorArgs.callInterceptorProviders.map(provider =>
543 provider(methodDefinition)
544 )
545 )
546 .filter(interceptor => interceptor);
547 // Filter out falsy values when providers return nothing
548 } else {
549 interceptors = ([] as Interceptor[])
550 .concat(
551 interceptorArgs.clientInterceptors,
552 interceptorArgs.clientInterceptorProviders.map(provider =>
553 provider(methodDefinition)
554 )
555 )
556 .filter(interceptor => interceptor);
557 // Filter out falsy values when providers return nothing
558 }
559 const interceptorOptions = Object.assign({}, options, {
560 method_definition: methodDefinition,
561 });
562 /* For each interceptor in the list, the nextCall function passed to it is
563 * based on the next interceptor in the list, using a nextCall function
564 * constructed with the following interceptor in the list, and so on. The
565 * initialValue, which is effectively at the end of the list, is a nextCall
566 * function that invokes getBottomInterceptingCall, the result of which
567 * handles (de)serialization and also gets the underlying call from the
568 * channel. */
569 const getCall: NextCall = interceptors.reduceRight<NextCall>(
570 (nextCall: NextCall, nextInterceptor: Interceptor) => {
571 return currentOptions => nextInterceptor(currentOptions, nextCall);
572 },
573 (finalOptions: InterceptorOptions) =>
574 getBottomInterceptingCall(channel, finalOptions, methodDefinition)
575 );
576 return getCall(interceptorOptions);
577}