1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | import { Metadata } from './metadata';
|
19 | import {
|
20 | StatusObject,
|
21 | Listener,
|
22 | MetadataListener,
|
23 | MessageListener,
|
24 | StatusListener,
|
25 | FullListener,
|
26 | InterceptingListener,
|
27 | InterceptingListenerImpl,
|
28 | isInterceptingListener,
|
29 | MessageContext,
|
30 | Call,
|
31 | } from './call-stream';
|
32 | import { Status } from './constants';
|
33 | import { Channel } from './channel';
|
34 | import { CallOptions } from './client';
|
35 | import { CallCredentials } from './call-credentials';
|
36 | import { ClientMethodDefinition } from './make-client';
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 | export class InterceptorConfigurationError extends Error {
|
43 | constructor(message: string) {
|
44 | super(message);
|
45 | this.name = 'InterceptorConfigurationError';
|
46 | Error.captureStackTrace(this, InterceptorConfigurationError);
|
47 | }
|
48 | }
|
49 |
|
50 | export interface MetadataRequester {
|
51 | (
|
52 | metadata: Metadata,
|
53 | listener: InterceptingListener,
|
54 | next: (
|
55 | metadata: Metadata,
|
56 | listener: InterceptingListener | Listener
|
57 | ) => void
|
58 | ): void;
|
59 | }
|
60 |
|
61 | export interface MessageRequester {
|
62 |
|
63 | (message: any, next: (message: any) => void): void;
|
64 | }
|
65 |
|
66 | export interface CloseRequester {
|
67 | (next: () => void): void;
|
68 | }
|
69 |
|
70 | export interface CancelRequester {
|
71 | (next: () => void): void;
|
72 | }
|
73 |
|
74 |
|
75 |
|
76 |
|
77 | export interface FullRequester {
|
78 | start: MetadataRequester;
|
79 | sendMessage: MessageRequester;
|
80 | halfClose: CloseRequester;
|
81 | cancel: CancelRequester;
|
82 | }
|
83 |
|
84 | export type Requester = Partial<FullRequester>;
|
85 |
|
86 | export class ListenerBuilder {
|
87 | private metadata: MetadataListener | undefined = undefined;
|
88 | private message: MessageListener | undefined = undefined;
|
89 | private status: StatusListener | undefined = undefined;
|
90 |
|
91 | withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this {
|
92 | this.metadata = onReceiveMetadata;
|
93 | return this;
|
94 | }
|
95 |
|
96 | withOnReceiveMessage(onReceiveMessage: MessageListener): this {
|
97 | this.message = onReceiveMessage;
|
98 | return this;
|
99 | }
|
100 |
|
101 | withOnReceiveStatus(onReceiveStatus: StatusListener): this {
|
102 | this.status = onReceiveStatus;
|
103 | return this;
|
104 | }
|
105 |
|
106 | build(): Listener {
|
107 | return {
|
108 | onReceiveMetadata: this.metadata,
|
109 | onReceiveMessage: this.message,
|
110 | onReceiveStatus: this.status,
|
111 | };
|
112 | }
|
113 | }
|
114 |
|
115 | export class RequesterBuilder {
|
116 | private start: MetadataRequester | undefined = undefined;
|
117 | private message: MessageRequester | undefined = undefined;
|
118 | private halfClose: CloseRequester | undefined = undefined;
|
119 | private cancel: CancelRequester | undefined = undefined;
|
120 |
|
121 | withStart(start: MetadataRequester): this {
|
122 | this.start = start;
|
123 | return this;
|
124 | }
|
125 |
|
126 | withSendMessage(sendMessage: MessageRequester): this {
|
127 | this.message = sendMessage;
|
128 | return this;
|
129 | }
|
130 |
|
131 | withHalfClose(halfClose: CloseRequester): this {
|
132 | this.halfClose = halfClose;
|
133 | return this;
|
134 | }
|
135 |
|
136 | withCancel(cancel: CancelRequester): this {
|
137 | this.cancel = cancel;
|
138 | return this;
|
139 | }
|
140 |
|
141 | build(): Requester {
|
142 | return {
|
143 | start: this.start,
|
144 | sendMessage: this.message,
|
145 | halfClose: this.halfClose,
|
146 | cancel: this.cancel,
|
147 | };
|
148 | }
|
149 | }
|
150 |
|
151 |
|
152 |
|
153 |
|
154 |
|
155 | const defaultListener: FullListener = {
|
156 | onReceiveMetadata: (metadata, next) => {
|
157 | next(metadata);
|
158 | },
|
159 | onReceiveMessage: (message, next) => {
|
160 | next(message);
|
161 | },
|
162 | onReceiveStatus: (status, next) => {
|
163 | next(status);
|
164 | },
|
165 | };
|
166 |
|
167 |
|
168 |
|
169 |
|
170 |
|
171 | const defaultRequester: FullRequester = {
|
172 | start: (metadata, listener, next) => {
|
173 | next(metadata, listener);
|
174 | },
|
175 | sendMessage: (message, next) => {
|
176 | next(message);
|
177 | },
|
178 | halfClose: (next) => {
|
179 | next();
|
180 | },
|
181 | cancel: (next) => {
|
182 | next();
|
183 | },
|
184 | };
|
185 |
|
186 | export interface InterceptorOptions extends CallOptions {
|
187 |
|
188 | method_definition: ClientMethodDefinition<any, any>;
|
189 | }
|
190 |
|
191 | export interface InterceptingCallInterface {
|
192 | cancelWithStatus(status: Status, details: string): void;
|
193 | getPeer(): string;
|
194 | start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
|
195 |
|
196 | sendMessageWithContext(context: MessageContext, message: any): void;
|
197 |
|
198 | sendMessage(message: any): void;
|
199 | startRead(): void;
|
200 | halfClose(): void;
|
201 |
|
202 | setCredentials(credentials: CallCredentials): void;
|
203 | }
|
204 |
|
205 | export class InterceptingCall implements InterceptingCallInterface {
|
206 | |
207 |
|
208 |
|
209 | private requester: FullRequester;
|
210 | |
211 |
|
212 |
|
213 |
|
214 | private processingMetadata = false;
|
215 | |
216 |
|
217 |
|
218 | private pendingMessageContext: MessageContext | null = null;
|
219 | private pendingMessage: any;
|
220 | |
221 |
|
222 |
|
223 |
|
224 | private processingMessage = false;
|
225 | |
226 |
|
227 |
|
228 |
|
229 | private pendingHalfClose = false;
|
230 | constructor(
|
231 | private nextCall: InterceptingCallInterface,
|
232 | requester?: Requester
|
233 | ) {
|
234 | if (requester) {
|
235 | this.requester = {
|
236 | start: requester.start ?? defaultRequester.start,
|
237 | sendMessage: requester.sendMessage ?? defaultRequester.sendMessage,
|
238 | halfClose: requester.halfClose ?? defaultRequester.halfClose,
|
239 | cancel: requester.cancel ?? defaultRequester.cancel,
|
240 | };
|
241 | } else {
|
242 | this.requester = defaultRequester;
|
243 | }
|
244 | }
|
245 |
|
246 | cancelWithStatus(status: Status, details: string) {
|
247 | this.requester.cancel(() => {
|
248 | this.nextCall.cancelWithStatus(status, details);
|
249 | });
|
250 | }
|
251 |
|
252 | getPeer() {
|
253 | return this.nextCall.getPeer();
|
254 | }
|
255 |
|
256 | private processPendingMessage() {
|
257 | if (this.pendingMessageContext) {
|
258 | this.nextCall.sendMessageWithContext(this.pendingMessageContext, this.pendingMessage);
|
259 | this.pendingMessageContext = null;
|
260 | this.pendingMessage = null;
|
261 | }
|
262 | }
|
263 |
|
264 | private processPendingHalfClose() {
|
265 | if (this.pendingHalfClose) {
|
266 | this.nextCall.halfClose();
|
267 | }
|
268 | }
|
269 |
|
270 | start(
|
271 | metadata: Metadata,
|
272 | interceptingListener?: Partial<InterceptingListener>
|
273 | ): void {
|
274 | const fullInterceptingListener: InterceptingListener = {
|
275 | onReceiveMetadata:
|
276 | interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
|
277 | ((metadata) => {}),
|
278 | onReceiveMessage:
|
279 | interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
|
280 | ((message) => {}),
|
281 | onReceiveStatus:
|
282 | interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
|
283 | ((status) => {}),
|
284 | };
|
285 | this.processingMetadata = true;
|
286 | this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
|
287 | this.processingMetadata = false;
|
288 | let finalInterceptingListener: InterceptingListener;
|
289 | if (isInterceptingListener(listener)) {
|
290 | finalInterceptingListener = listener;
|
291 | } else {
|
292 | const fullListener: FullListener = {
|
293 | onReceiveMetadata:
|
294 | listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
|
295 | onReceiveMessage:
|
296 | listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
|
297 | onReceiveStatus:
|
298 | listener.onReceiveStatus ?? defaultListener.onReceiveStatus,
|
299 | };
|
300 | finalInterceptingListener = new InterceptingListenerImpl(
|
301 | fullListener,
|
302 | fullInterceptingListener
|
303 | );
|
304 | }
|
305 | this.nextCall.start(md, finalInterceptingListener);
|
306 | this.processPendingMessage();
|
307 | this.processPendingHalfClose();
|
308 | });
|
309 | }
|
310 |
|
311 | sendMessageWithContext(context: MessageContext, message: any): void {
|
312 | this.processingMessage = true;
|
313 | this.requester.sendMessage(message, (finalMessage) => {
|
314 | this.processingMessage = false;
|
315 | if (this.processingMetadata) {
|
316 | this.pendingMessageContext = context;
|
317 | this.pendingMessage = message;
|
318 | } else {
|
319 | this.nextCall.sendMessageWithContext(context, finalMessage);
|
320 | this.processPendingHalfClose();
|
321 | }
|
322 | });
|
323 | }
|
324 |
|
325 | sendMessage(message: any): void {
|
326 | this.sendMessageWithContext({}, message);
|
327 | }
|
328 | startRead(): void {
|
329 | this.nextCall.startRead();
|
330 | }
|
331 | halfClose(): void {
|
332 | this.requester.halfClose(() => {
|
333 | if (this.processingMetadata || this.processingMessage) {
|
334 | this.pendingHalfClose = true;
|
335 | } else {
|
336 | this.nextCall.halfClose();
|
337 | }
|
338 | });
|
339 | }
|
340 | setCredentials(credentials: CallCredentials): void {
|
341 | this.nextCall.setCredentials(credentials);
|
342 | }
|
343 | }
|
344 |
|
345 | function getCall(channel: Channel, path: string, options: CallOptions): Call {
|
346 | const deadline = options.deadline ?? Infinity;
|
347 | const host = options.host;
|
348 | const parent = options.parent ?? null;
|
349 | const propagateFlags = options.propagate_flags;
|
350 | const credentials = options.credentials;
|
351 | const call = channel.createCall(path, deadline, host, parent, propagateFlags);
|
352 | if (credentials) {
|
353 | call.setCredentials(credentials);
|
354 | }
|
355 | return call;
|
356 | }
|
357 |
|
358 |
|
359 |
|
360 |
|
361 |
|
362 | class BaseInterceptingCall implements InterceptingCallInterface {
|
363 | constructor(
|
364 | protected call: Call,
|
365 |
|
366 | protected methodDefinition: ClientMethodDefinition<any, any>
|
367 | ) {}
|
368 | cancelWithStatus(status: Status, details: string): void {
|
369 | this.call.cancelWithStatus(status, details);
|
370 | }
|
371 | getPeer(): string {
|
372 | return this.call.getPeer();
|
373 | }
|
374 | setCredentials(credentials: CallCredentials): void {
|
375 | this.call.setCredentials(credentials);
|
376 | }
|
377 |
|
378 | sendMessageWithContext(context: MessageContext, message: any): void {
|
379 | let serialized: Buffer;
|
380 | try {
|
381 | serialized = this.methodDefinition.requestSerialize(message);
|
382 | } catch (e) {
|
383 | this.call.cancelWithStatus(
|
384 | Status.INTERNAL,
|
385 | `Request message serialization failure: ${e.message}`
|
386 | );
|
387 | return;
|
388 | }
|
389 | this.call.sendMessageWithContext(context, serialized);
|
390 | }
|
391 |
|
392 | sendMessage(message: any) {
|
393 | this.sendMessageWithContext({}, message);
|
394 | }
|
395 | start(
|
396 | metadata: Metadata,
|
397 | interceptingListener?: Partial<InterceptingListener>
|
398 | ): void {
|
399 | let readError: StatusObject | null = null;
|
400 | this.call.start(metadata, {
|
401 | onReceiveMetadata: (metadata) => {
|
402 | interceptingListener?.onReceiveMetadata?.(metadata);
|
403 | },
|
404 | onReceiveMessage: (message) => {
|
405 |
|
406 | let deserialized: any;
|
407 | try {
|
408 | deserialized = this.methodDefinition.responseDeserialize(message);
|
409 | } catch (e) {
|
410 | readError = {
|
411 | code: Status.INTERNAL,
|
412 | details: `Response message parsing error: ${e.message}`,
|
413 | metadata: new Metadata(),
|
414 | };
|
415 | this.call.cancelWithStatus(readError.code, readError.details);
|
416 | return;
|
417 | }
|
418 | interceptingListener?.onReceiveMessage?.(deserialized);
|
419 | },
|
420 | onReceiveStatus: (status) => {
|
421 | if (readError) {
|
422 | interceptingListener?.onReceiveStatus?.(readError);
|
423 | } else {
|
424 | interceptingListener?.onReceiveStatus?.(status);
|
425 | }
|
426 | },
|
427 | });
|
428 | }
|
429 | startRead() {
|
430 | this.call.startRead();
|
431 | }
|
432 | halfClose(): void {
|
433 | this.call.halfClose();
|
434 | }
|
435 | }
|
436 |
|
437 |
|
438 |
|
439 |
|
440 |
|
441 | class BaseUnaryInterceptingCall
|
442 | extends BaseInterceptingCall
|
443 | implements InterceptingCallInterface {
|
444 |
|
445 | constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
|
446 | super(call, methodDefinition);
|
447 | }
|
448 | start(metadata: Metadata, listener?: Partial<InterceptingListener>): void {
|
449 | let receivedMessage = false;
|
450 | const wrapperListener: InterceptingListener = {
|
451 | onReceiveMetadata:
|
452 | listener?.onReceiveMetadata?.bind(listener) ?? ((metadata) => {}),
|
453 | // eslint-disable-next-line @typescript-eslint/no-explicit-any
|
454 | onReceiveMessage: (message: any) => {
|
455 | receivedMessage = true;
|
456 | listener?.onReceiveMessage?.(message);
|
457 | },
|
458 | onReceiveStatus: (status: StatusObject) => {
|
459 | if (!receivedMessage) {
|
460 | listener?.onReceiveMessage?.(null);
|
461 | }
|
462 | listener?.onReceiveStatus?.(status);
|
463 | },
|
464 | };
|
465 | super.start(metadata, wrapperListener);
|
466 | this.call.startRead();
|
467 | }
|
468 | }
|
469 |
|
470 |
|
471 |
|
472 |
|
473 |
|
474 | class BaseStreamingInterceptingCall
|
475 | extends BaseInterceptingCall
|
476 | implements InterceptingCallInterface {}
|
477 |
|
478 | function getBottomInterceptingCall(
|
479 | channel: Channel,
|
480 | options: InterceptorOptions,
|
481 |
|
482 | methodDefinition: ClientMethodDefinition<any, any>
|
483 | ) {
|
484 | const call = getCall(channel, methodDefinition.path, options);
|
485 | if (methodDefinition.responseStream) {
|
486 | return new BaseStreamingInterceptingCall(call, methodDefinition);
|
487 | } else {
|
488 | return new BaseUnaryInterceptingCall(call, methodDefinition);
|
489 | }
|
490 | }
|
491 |
|
492 | export interface NextCall {
|
493 | (options: InterceptorOptions): InterceptingCallInterface;
|
494 | }
|
495 |
|
496 | export interface Interceptor {
|
497 | (options: InterceptorOptions, nextCall: NextCall): InterceptingCall;
|
498 | }
|
499 |
|
500 | export interface InterceptorProvider {
|
501 |
|
502 | (methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
|
503 | }
|
504 |
|
505 | export interface InterceptorArguments {
|
506 | clientInterceptors: Interceptor[];
|
507 | clientInterceptorProviders: InterceptorProvider[];
|
508 | callInterceptors: Interceptor[];
|
509 | callInterceptorProviders: InterceptorProvider[];
|
510 | }
|
511 |
|
512 | export function getInterceptingCall(
|
513 | interceptorArgs: InterceptorArguments,
|
514 |
|
515 | methodDefinition: ClientMethodDefinition<any, any>,
|
516 | options: CallOptions,
|
517 | channel: Channel
|
518 | ): InterceptingCallInterface {
|
519 | if (
|
520 | interceptorArgs.clientInterceptors.length > 0 &&
|
521 | interceptorArgs.clientInterceptorProviders.length > 0
|
522 | ) {
|
523 | throw new InterceptorConfigurationError(
|
524 | 'Both interceptors and interceptor_providers were passed as options ' +
|
525 | 'to the client constructor. Only one of these is allowed.'
|
526 | );
|
527 | }
|
528 | if (
|
529 | interceptorArgs.callInterceptors.length > 0 &&
|
530 | interceptorArgs.callInterceptorProviders.length > 0
|
531 | ) {
|
532 | throw new InterceptorConfigurationError(
|
533 | 'Both interceptors and interceptor_providers were passed as call ' +
|
534 | 'options. Only one of these is allowed.'
|
535 | );
|
536 | }
|
537 | let interceptors: Interceptor[] = [];
|
538 |
|
539 | if (
|
540 | interceptorArgs.callInterceptors.length > 0 ||
|
541 | interceptorArgs.callInterceptorProviders.length > 0
|
542 | ) {
|
543 | interceptors = ([] as Interceptor[])
|
544 | .concat(
|
545 | interceptorArgs.callInterceptors,
|
546 | interceptorArgs.callInterceptorProviders.map((provider) =>
|
547 | provider(methodDefinition)
|
548 | )
|
549 | )
|
550 | .filter((interceptor) => interceptor);
|
551 |
|
552 | } else {
|
553 | interceptors = ([] as Interceptor[])
|
554 | .concat(
|
555 | interceptorArgs.clientInterceptors,
|
556 | interceptorArgs.clientInterceptorProviders.map((provider) =>
|
557 | provider(methodDefinition)
|
558 | )
|
559 | )
|
560 | .filter((interceptor) => interceptor);
|
561 |
|
562 | }
|
563 | const interceptorOptions = Object.assign({}, options, {
|
564 | method_definition: methodDefinition,
|
565 | });
|
566 | |
567 |
|
568 |
|
569 |
|
570 |
|
571 |
|
572 |
|
573 | const getCall: NextCall = interceptors.reduceRight<NextCall>(
|
574 | (nextCall: NextCall, nextInterceptor: Interceptor) => {
|
575 | return (currentOptions) => nextInterceptor(currentOptions, nextCall);
|
576 | },
|
577 | (finalOptions: InterceptorOptions) =>
|
578 | getBottomInterceptingCall(channel, finalOptions, methodDefinition)
|
579 | );
|
580 | return getCall(interceptorOptions);
|
581 | }
|