UNPKG

25.1 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 ClientDuplexStream,
20 ClientDuplexStreamImpl,
21 ClientReadableStream,
22 ClientReadableStreamImpl,
23 ClientUnaryCall,
24 ClientUnaryCallImpl,
25 ClientWritableStream,
26 ClientWritableStreamImpl,
27 ServiceError,
28 callErrorFromStatus,
29 SurfaceCall,
30} from './call';
31import { CallCredentials } from './call-credentials';
32import { StatusObject } from './call-interface';
33import { Channel, ChannelImplementation } from './channel';
34import { ConnectivityState } from './connectivity-state';
35import { ChannelCredentials } from './channel-credentials';
36import { ChannelOptions } from './channel-options';
37import { Status } from './constants';
38import { Metadata } from './metadata';
39import { ClientMethodDefinition } from './make-client';
40import {
41 getInterceptingCall,
42 Interceptor,
43 InterceptorProvider,
44 InterceptorArguments,
45 InterceptingCallInterface,
46} from './client-interceptors';
47import {
48 ServerUnaryCall,
49 ServerReadableStream,
50 ServerWritableStream,
51 ServerDuplexStream,
52} from './server-call';
53import { Deadline } from './deadline';
54
55const CHANNEL_SYMBOL = Symbol();
56const INTERCEPTOR_SYMBOL = Symbol();
57const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
58const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol();
59
60function isFunction<ResponseType>(
61 arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined
62): arg is UnaryCallback<ResponseType> {
63 return typeof arg === 'function';
64}
65
66export interface UnaryCallback<ResponseType> {
67 (err: ServiceError | null, value?: ResponseType): void;
68}
69
70/* eslint-disable @typescript-eslint/no-explicit-any */
71export interface CallOptions {
72 deadline?: Deadline;
73 host?: string;
74 parent?:
75 | ServerUnaryCall<any, any>
76 | ServerReadableStream<any, any>
77 | ServerWritableStream<any, any>
78 | ServerDuplexStream<any, any>;
79 propagate_flags?: number;
80 credentials?: CallCredentials;
81 interceptors?: Interceptor[];
82 interceptor_providers?: InterceptorProvider[];
83}
84/* eslint-enable @typescript-eslint/no-explicit-any */
85
86export interface CallProperties<RequestType, ResponseType> {
87 argument?: RequestType;
88 metadata: Metadata;
89 call: SurfaceCall;
90 channel: Channel;
91 methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
92 callOptions: CallOptions;
93 callback?: UnaryCallback<ResponseType>;
94}
95
96export interface CallInvocationTransformer {
97 (callProperties: CallProperties<any, any>): CallProperties<any, any>; // eslint-disable-line @typescript-eslint/no-explicit-any
98}
99
100export type ClientOptions = Partial<ChannelOptions> & {
101 channelOverride?: Channel;
102 channelFactoryOverride?: (
103 address: string,
104 credentials: ChannelCredentials,
105 options: ClientOptions
106 ) => Channel;
107 interceptors?: Interceptor[];
108 interceptor_providers?: InterceptorProvider[];
109 callInvocationTransformer?: CallInvocationTransformer;
110};
111
112function getErrorStackString(error: Error): string {
113 return error.stack?.split('\n').slice(1).join('\n') || 'no stack trace available';
114}
115
116/**
117 * A generic gRPC client. Primarily useful as a base class for all generated
118 * clients.
119 */
120export class Client {
121 private readonly [CHANNEL_SYMBOL]: Channel;
122 private readonly [INTERCEPTOR_SYMBOL]: Interceptor[];
123 private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[];
124 private readonly [CALL_INVOCATION_TRANSFORMER_SYMBOL]?: CallInvocationTransformer;
125 constructor(
126 address: string,
127 credentials: ChannelCredentials,
128 options: ClientOptions = {}
129 ) {
130 options = Object.assign({}, options);
131 this[INTERCEPTOR_SYMBOL] = options.interceptors ?? [];
132 delete options.interceptors;
133 this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers ?? [];
134 delete options.interceptor_providers;
135 if (
136 this[INTERCEPTOR_SYMBOL].length > 0 &&
137 this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0
138 ) {
139 throw new Error(
140 'Both interceptors and interceptor_providers were passed as options ' +
141 'to the client constructor. Only one of these is allowed.'
142 );
143 }
144 this[CALL_INVOCATION_TRANSFORMER_SYMBOL] =
145 options.callInvocationTransformer;
146 delete options.callInvocationTransformer;
147 if (options.channelOverride) {
148 this[CHANNEL_SYMBOL] = options.channelOverride;
149 } else if (options.channelFactoryOverride) {
150 const channelFactoryOverride = options.channelFactoryOverride;
151 delete options.channelFactoryOverride;
152 this[CHANNEL_SYMBOL] = channelFactoryOverride(
153 address,
154 credentials,
155 options
156 );
157 } else {
158 this[CHANNEL_SYMBOL] = new ChannelImplementation(
159 address,
160 credentials,
161 options
162 );
163 }
164 }
165
166 close(): void {
167 this[CHANNEL_SYMBOL].close();
168 }
169
170 getChannel(): Channel {
171 return this[CHANNEL_SYMBOL];
172 }
173
174 waitForReady(deadline: Deadline, callback: (error?: Error) => void): void {
175 const checkState = (err?: Error) => {
176 if (err) {
177 callback(new Error('Failed to connect before the deadline'));
178 return;
179 }
180 let newState;
181 try {
182 newState = this[CHANNEL_SYMBOL].getConnectivityState(true);
183 } catch (e) {
184 callback(new Error('The channel has been closed'));
185 return;
186 }
187 if (newState === ConnectivityState.READY) {
188 callback();
189 } else {
190 try {
191 this[CHANNEL_SYMBOL].watchConnectivityState(
192 newState,
193 deadline,
194 checkState
195 );
196 } catch (e) {
197 callback(new Error('The channel has been closed'));
198 }
199 }
200 };
201 setImmediate(checkState);
202 }
203
204 private checkOptionalUnaryResponseArguments<ResponseType>(
205 arg1: Metadata | CallOptions | UnaryCallback<ResponseType>,
206 arg2?: CallOptions | UnaryCallback<ResponseType>,
207 arg3?: UnaryCallback<ResponseType>
208 ): {
209 metadata: Metadata;
210 options: CallOptions;
211 callback: UnaryCallback<ResponseType>;
212 } {
213 if (isFunction(arg1)) {
214 return { metadata: new Metadata(), options: {}, callback: arg1 };
215 } else if (isFunction(arg2)) {
216 if (arg1 instanceof Metadata) {
217 return { metadata: arg1, options: {}, callback: arg2 };
218 } else {
219 return { metadata: new Metadata(), options: arg1, callback: arg2 };
220 }
221 } else {
222 if (
223 !(
224 arg1 instanceof Metadata &&
225 arg2 instanceof Object &&
226 isFunction(arg3)
227 )
228 ) {
229 throw new Error('Incorrect arguments passed');
230 }
231 return { metadata: arg1, options: arg2, callback: arg3 };
232 }
233 }
234
235 makeUnaryRequest<RequestType, ResponseType>(
236 method: string,
237 serialize: (value: RequestType) => Buffer,
238 deserialize: (value: Buffer) => ResponseType,
239 argument: RequestType,
240 metadata: Metadata,
241 options: CallOptions,
242 callback: UnaryCallback<ResponseType>
243 ): ClientUnaryCall;
244 makeUnaryRequest<RequestType, ResponseType>(
245 method: string,
246 serialize: (value: RequestType) => Buffer,
247 deserialize: (value: Buffer) => ResponseType,
248 argument: RequestType,
249 metadata: Metadata,
250 callback: UnaryCallback<ResponseType>
251 ): ClientUnaryCall;
252 makeUnaryRequest<RequestType, ResponseType>(
253 method: string,
254 serialize: (value: RequestType) => Buffer,
255 deserialize: (value: Buffer) => ResponseType,
256 argument: RequestType,
257 options: CallOptions,
258 callback: UnaryCallback<ResponseType>
259 ): ClientUnaryCall;
260 makeUnaryRequest<RequestType, ResponseType>(
261 method: string,
262 serialize: (value: RequestType) => Buffer,
263 deserialize: (value: Buffer) => ResponseType,
264 argument: RequestType,
265 callback: UnaryCallback<ResponseType>
266 ): ClientUnaryCall;
267 makeUnaryRequest<RequestType, ResponseType>(
268 method: string,
269 serialize: (value: RequestType) => Buffer,
270 deserialize: (value: Buffer) => ResponseType,
271 argument: RequestType,
272 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
273 options?: CallOptions | UnaryCallback<ResponseType>,
274 callback?: UnaryCallback<ResponseType>
275 ): ClientUnaryCall {
276 const checkedArguments =
277 this.checkOptionalUnaryResponseArguments<ResponseType>(
278 metadata,
279 options,
280 callback
281 );
282 const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> =
283 {
284 path: method,
285 requestStream: false,
286 responseStream: false,
287 requestSerialize: serialize,
288 responseDeserialize: deserialize,
289 };
290 let callProperties: CallProperties<RequestType, ResponseType> = {
291 argument: argument,
292 metadata: checkedArguments.metadata,
293 call: new ClientUnaryCallImpl(),
294 channel: this[CHANNEL_SYMBOL],
295 methodDefinition: methodDefinition,
296 callOptions: checkedArguments.options,
297 callback: checkedArguments.callback,
298 };
299 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
300 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
301 callProperties
302 ) as CallProperties<RequestType, ResponseType>;
303 }
304 const emitter: ClientUnaryCall = callProperties.call;
305 const interceptorArgs: InterceptorArguments = {
306 clientInterceptors: this[INTERCEPTOR_SYMBOL],
307 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
308 callInterceptors: callProperties.callOptions.interceptors ?? [],
309 callInterceptorProviders:
310 callProperties.callOptions.interceptor_providers ?? [],
311 };
312 const call: InterceptingCallInterface = getInterceptingCall(
313 interceptorArgs,
314 callProperties.methodDefinition,
315 callProperties.callOptions,
316 callProperties.channel
317 );
318 /* This needs to happen before the emitter is used. Unfortunately we can't
319 * enforce this with the type system. We need to construct this emitter
320 * before calling the CallInvocationTransformer, and we need to create the
321 * call after that. */
322 emitter.call = call;
323 let responseMessage: ResponseType | null = null;
324 let receivedStatus = false;
325 let callerStackError: Error | null = new Error();
326 call.start(callProperties.metadata, {
327 onReceiveMetadata: metadata => {
328 emitter.emit('metadata', metadata);
329 },
330 // eslint-disable-next-line @typescript-eslint/no-explicit-any
331 onReceiveMessage(message: any) {
332 if (responseMessage !== null) {
333 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
334 }
335 responseMessage = message;
336 },
337 onReceiveStatus(status: StatusObject) {
338 if (receivedStatus) {
339 return;
340 }
341 receivedStatus = true;
342 if (status.code === Status.OK) {
343 if (responseMessage === null) {
344 const callerStack = getErrorStackString(callerStackError!);
345 callProperties.callback!(
346 callErrorFromStatus(
347 {
348 code: Status.INTERNAL,
349 details: 'No message received',
350 metadata: status.metadata,
351 },
352 callerStack
353 )
354 );
355 } else {
356 callProperties.callback!(null, responseMessage);
357 }
358 } else {
359 const callerStack = getErrorStackString(callerStackError!);
360 callProperties.callback!(callErrorFromStatus(status, callerStack));
361 }
362 /* Avoid retaining the callerStackError object in the call context of
363 * the status event handler. */
364 callerStackError = null;
365 emitter.emit('status', status);
366 },
367 });
368 call.sendMessage(argument);
369 call.halfClose();
370 return emitter;
371 }
372
373 makeClientStreamRequest<RequestType, ResponseType>(
374 method: string,
375 serialize: (value: RequestType) => Buffer,
376 deserialize: (value: Buffer) => ResponseType,
377 metadata: Metadata,
378 options: CallOptions,
379 callback: UnaryCallback<ResponseType>
380 ): ClientWritableStream<RequestType>;
381 makeClientStreamRequest<RequestType, ResponseType>(
382 method: string,
383 serialize: (value: RequestType) => Buffer,
384 deserialize: (value: Buffer) => ResponseType,
385 metadata: Metadata,
386 callback: UnaryCallback<ResponseType>
387 ): ClientWritableStream<RequestType>;
388 makeClientStreamRequest<RequestType, ResponseType>(
389 method: string,
390 serialize: (value: RequestType) => Buffer,
391 deserialize: (value: Buffer) => ResponseType,
392 options: CallOptions,
393 callback: UnaryCallback<ResponseType>
394 ): ClientWritableStream<RequestType>;
395 makeClientStreamRequest<RequestType, ResponseType>(
396 method: string,
397 serialize: (value: RequestType) => Buffer,
398 deserialize: (value: Buffer) => ResponseType,
399 callback: UnaryCallback<ResponseType>
400 ): ClientWritableStream<RequestType>;
401 makeClientStreamRequest<RequestType, ResponseType>(
402 method: string,
403 serialize: (value: RequestType) => Buffer,
404 deserialize: (value: Buffer) => ResponseType,
405 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
406 options?: CallOptions | UnaryCallback<ResponseType>,
407 callback?: UnaryCallback<ResponseType>
408 ): ClientWritableStream<RequestType> {
409 const checkedArguments =
410 this.checkOptionalUnaryResponseArguments<ResponseType>(
411 metadata,
412 options,
413 callback
414 );
415 const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> =
416 {
417 path: method,
418 requestStream: true,
419 responseStream: false,
420 requestSerialize: serialize,
421 responseDeserialize: deserialize,
422 };
423 let callProperties: CallProperties<RequestType, ResponseType> = {
424 metadata: checkedArguments.metadata,
425 call: new ClientWritableStreamImpl<RequestType>(serialize),
426 channel: this[CHANNEL_SYMBOL],
427 methodDefinition: methodDefinition,
428 callOptions: checkedArguments.options,
429 callback: checkedArguments.callback,
430 };
431 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
432 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
433 callProperties
434 ) as CallProperties<RequestType, ResponseType>;
435 }
436 const emitter: ClientWritableStream<RequestType> =
437 callProperties.call as ClientWritableStream<RequestType>;
438 const interceptorArgs: InterceptorArguments = {
439 clientInterceptors: this[INTERCEPTOR_SYMBOL],
440 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
441 callInterceptors: callProperties.callOptions.interceptors ?? [],
442 callInterceptorProviders:
443 callProperties.callOptions.interceptor_providers ?? [],
444 };
445 const call: InterceptingCallInterface = getInterceptingCall(
446 interceptorArgs,
447 callProperties.methodDefinition,
448 callProperties.callOptions,
449 callProperties.channel
450 );
451 /* This needs to happen before the emitter is used. Unfortunately we can't
452 * enforce this with the type system. We need to construct this emitter
453 * before calling the CallInvocationTransformer, and we need to create the
454 * call after that. */
455 emitter.call = call;
456 let responseMessage: ResponseType | null = null;
457 let receivedStatus = false;
458 let callerStackError: Error | null = new Error();
459 call.start(callProperties.metadata, {
460 onReceiveMetadata: metadata => {
461 emitter.emit('metadata', metadata);
462 },
463 // eslint-disable-next-line @typescript-eslint/no-explicit-any
464 onReceiveMessage(message: any) {
465 if (responseMessage !== null) {
466 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
467 }
468 responseMessage = message;
469 },
470 onReceiveStatus(status: StatusObject) {
471 if (receivedStatus) {
472 return;
473 }
474 receivedStatus = true;
475 if (status.code === Status.OK) {
476 if (responseMessage === null) {
477 const callerStack = getErrorStackString(callerStackError!);
478 callProperties.callback!(
479 callErrorFromStatus(
480 {
481 code: Status.INTERNAL,
482 details: 'No message received',
483 metadata: status.metadata,
484 },
485 callerStack
486 )
487 );
488 } else {
489 callProperties.callback!(null, responseMessage);
490 }
491 } else {
492 const callerStack = getErrorStackString(callerStackError!);
493 callProperties.callback!(callErrorFromStatus(status, callerStack));
494 }
495 /* Avoid retaining the callerStackError object in the call context of
496 * the status event handler. */
497 callerStackError = null;
498 emitter.emit('status', status);
499 },
500 });
501 return emitter;
502 }
503
504 private checkMetadataAndOptions(
505 arg1?: Metadata | CallOptions,
506 arg2?: CallOptions
507 ): { metadata: Metadata; options: CallOptions } {
508 let metadata: Metadata;
509 let options: CallOptions;
510 if (arg1 instanceof Metadata) {
511 metadata = arg1;
512 if (arg2) {
513 options = arg2;
514 } else {
515 options = {};
516 }
517 } else {
518 if (arg1) {
519 options = arg1;
520 } else {
521 options = {};
522 }
523 metadata = new Metadata();
524 }
525 return { metadata, options };
526 }
527
528 makeServerStreamRequest<RequestType, ResponseType>(
529 method: string,
530 serialize: (value: RequestType) => Buffer,
531 deserialize: (value: Buffer) => ResponseType,
532 argument: RequestType,
533 metadata: Metadata,
534 options?: CallOptions
535 ): ClientReadableStream<ResponseType>;
536 makeServerStreamRequest<RequestType, ResponseType>(
537 method: string,
538 serialize: (value: RequestType) => Buffer,
539 deserialize: (value: Buffer) => ResponseType,
540 argument: RequestType,
541 options?: CallOptions
542 ): ClientReadableStream<ResponseType>;
543 makeServerStreamRequest<RequestType, ResponseType>(
544 method: string,
545 serialize: (value: RequestType) => Buffer,
546 deserialize: (value: Buffer) => ResponseType,
547 argument: RequestType,
548 metadata?: Metadata | CallOptions,
549 options?: CallOptions
550 ): ClientReadableStream<ResponseType> {
551 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
552 const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> =
553 {
554 path: method,
555 requestStream: false,
556 responseStream: true,
557 requestSerialize: serialize,
558 responseDeserialize: deserialize,
559 };
560 let callProperties: CallProperties<RequestType, ResponseType> = {
561 argument: argument,
562 metadata: checkedArguments.metadata,
563 call: new ClientReadableStreamImpl<ResponseType>(deserialize),
564 channel: this[CHANNEL_SYMBOL],
565 methodDefinition: methodDefinition,
566 callOptions: checkedArguments.options,
567 };
568 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
569 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
570 callProperties
571 ) as CallProperties<RequestType, ResponseType>;
572 }
573 const stream: ClientReadableStream<ResponseType> =
574 callProperties.call as ClientReadableStream<ResponseType>;
575 const interceptorArgs: InterceptorArguments = {
576 clientInterceptors: this[INTERCEPTOR_SYMBOL],
577 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
578 callInterceptors: callProperties.callOptions.interceptors ?? [],
579 callInterceptorProviders:
580 callProperties.callOptions.interceptor_providers ?? [],
581 };
582 const call: InterceptingCallInterface = getInterceptingCall(
583 interceptorArgs,
584 callProperties.methodDefinition,
585 callProperties.callOptions,
586 callProperties.channel
587 );
588 /* This needs to happen before the emitter is used. Unfortunately we can't
589 * enforce this with the type system. We need to construct this emitter
590 * before calling the CallInvocationTransformer, and we need to create the
591 * call after that. */
592 stream.call = call;
593 let receivedStatus = false;
594 let callerStackError: Error | null = new Error();
595 call.start(callProperties.metadata, {
596 onReceiveMetadata(metadata: Metadata) {
597 stream.emit('metadata', metadata);
598 },
599 // eslint-disable-next-line @typescript-eslint/no-explicit-any
600 onReceiveMessage(message: any) {
601 stream.push(message);
602 },
603 onReceiveStatus(status: StatusObject) {
604 if (receivedStatus) {
605 return;
606 }
607 receivedStatus = true;
608 stream.push(null);
609 if (status.code !== Status.OK) {
610 const callerStack = getErrorStackString(callerStackError!);
611 stream.emit('error', callErrorFromStatus(status, callerStack));
612 }
613 /* Avoid retaining the callerStackError object in the call context of
614 * the status event handler. */
615 callerStackError = null;
616 stream.emit('status', status);
617 },
618 });
619 call.sendMessage(argument);
620 call.halfClose();
621 return stream;
622 }
623
624 makeBidiStreamRequest<RequestType, ResponseType>(
625 method: string,
626 serialize: (value: RequestType) => Buffer,
627 deserialize: (value: Buffer) => ResponseType,
628 metadata: Metadata,
629 options?: CallOptions
630 ): ClientDuplexStream<RequestType, ResponseType>;
631 makeBidiStreamRequest<RequestType, ResponseType>(
632 method: string,
633 serialize: (value: RequestType) => Buffer,
634 deserialize: (value: Buffer) => ResponseType,
635 options?: CallOptions
636 ): ClientDuplexStream<RequestType, ResponseType>;
637 makeBidiStreamRequest<RequestType, ResponseType>(
638 method: string,
639 serialize: (value: RequestType) => Buffer,
640 deserialize: (value: Buffer) => ResponseType,
641 metadata?: Metadata | CallOptions,
642 options?: CallOptions
643 ): ClientDuplexStream<RequestType, ResponseType> {
644 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
645 const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> =
646 {
647 path: method,
648 requestStream: true,
649 responseStream: true,
650 requestSerialize: serialize,
651 responseDeserialize: deserialize,
652 };
653 let callProperties: CallProperties<RequestType, ResponseType> = {
654 metadata: checkedArguments.metadata,
655 call: new ClientDuplexStreamImpl<RequestType, ResponseType>(
656 serialize,
657 deserialize
658 ),
659 channel: this[CHANNEL_SYMBOL],
660 methodDefinition: methodDefinition,
661 callOptions: checkedArguments.options,
662 };
663 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
664 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
665 callProperties
666 ) as CallProperties<RequestType, ResponseType>;
667 }
668 const stream: ClientDuplexStream<RequestType, ResponseType> =
669 callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
670 const interceptorArgs: InterceptorArguments = {
671 clientInterceptors: this[INTERCEPTOR_SYMBOL],
672 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
673 callInterceptors: callProperties.callOptions.interceptors ?? [],
674 callInterceptorProviders:
675 callProperties.callOptions.interceptor_providers ?? [],
676 };
677 const call: InterceptingCallInterface = getInterceptingCall(
678 interceptorArgs,
679 callProperties.methodDefinition,
680 callProperties.callOptions,
681 callProperties.channel
682 );
683 /* This needs to happen before the emitter is used. Unfortunately we can't
684 * enforce this with the type system. We need to construct this emitter
685 * before calling the CallInvocationTransformer, and we need to create the
686 * call after that. */
687 stream.call = call;
688 let receivedStatus = false;
689 let callerStackError: Error | null = new Error();
690 call.start(callProperties.metadata, {
691 onReceiveMetadata(metadata: Metadata) {
692 stream.emit('metadata', metadata);
693 },
694 onReceiveMessage(message: Buffer) {
695 stream.push(message);
696 },
697 onReceiveStatus(status: StatusObject) {
698 if (receivedStatus) {
699 return;
700 }
701 receivedStatus = true;
702 stream.push(null);
703 if (status.code !== Status.OK) {
704 const callerStack = getErrorStackString(callerStackError!);
705 stream.emit('error', callErrorFromStatus(status, callerStack));
706 }
707 /* Avoid retaining the callerStackError object in the call context of
708 * the status event handler. */
709 callerStackError = null;
710 stream.emit('status', status);
711 },
712 });
713 return stream;
714 }
715}