UNPKG

24.8 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 ClientDuplexStream,
20 ClientDuplexStreamImpl,
21 ClientReadableStream,
22 ClientReadableStreamImpl,
23 ClientUnaryCall,
24 ClientUnaryCallImpl,
25 ClientWritableStream,
26 ClientWritableStreamImpl,
27 ServiceError,
28 callErrorFromStatus,
29 SurfaceCall,
30} from './call';
31import { CallCredentials } from './call-credentials';
32import { StatusObject } from './call-interface';
33import { Channel, ChannelImplementation } from './channel';
34import { ConnectivityState } from './connectivity-state';
35import { ChannelCredentials } from './channel-credentials';
36import { ChannelOptions } from './channel-options';
37import { Status } from './constants';
38import { Metadata } from './metadata';
39import { ClientMethodDefinition } from './make-client';
40import {
41 getInterceptingCall,
42 Interceptor,
43 InterceptorProvider,
44 InterceptorArguments,
45 InterceptingCallInterface,
46} from './client-interceptors';
47import {
48 ServerUnaryCall,
49 ServerReadableStream,
50 ServerWritableStream,
51 ServerDuplexStream,
52} from './server-call';
53import { Deadline } from './deadline';
54
55const CHANNEL_SYMBOL = Symbol();
56const INTERCEPTOR_SYMBOL = Symbol();
57const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
58const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol();
59
60function isFunction<ResponseType>(
61 arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined
62): arg is UnaryCallback<ResponseType> {
63 return typeof arg === 'function';
64}
65
66export interface UnaryCallback<ResponseType> {
67 (err: ServiceError | null, value?: ResponseType): void;
68}
69
70/* eslint-disable @typescript-eslint/no-explicit-any */
71export interface CallOptions {
72 deadline?: Deadline;
73 host?: string;
74 parent?:
75 | ServerUnaryCall<any, any>
76 | ServerReadableStream<any, any>
77 | ServerWritableStream<any, any>
78 | ServerDuplexStream<any, any>;
79 propagate_flags?: number;
80 credentials?: CallCredentials;
81 interceptors?: Interceptor[];
82 interceptor_providers?: InterceptorProvider[];
83}
84/* eslint-enable @typescript-eslint/no-explicit-any */
85
86export interface CallProperties<RequestType, ResponseType> {
87 argument?: RequestType;
88 metadata: Metadata;
89 call: SurfaceCall;
90 channel: Channel;
91 methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
92 callOptions: CallOptions;
93 callback?: UnaryCallback<ResponseType>;
94}
95
96export interface CallInvocationTransformer {
97 (callProperties: CallProperties<any, any>): CallProperties<any, any>; // eslint-disable-line @typescript-eslint/no-explicit-any
98}
99
100export type ClientOptions = Partial<ChannelOptions> & {
101 channelOverride?: Channel;
102 channelFactoryOverride?: (
103 address: string,
104 credentials: ChannelCredentials,
105 options: ClientOptions
106 ) => Channel;
107 interceptors?: Interceptor[];
108 interceptor_providers?: InterceptorProvider[];
109 callInvocationTransformer?: CallInvocationTransformer;
110};
111
112function getErrorStackString(error: Error): string {
113 return error.stack!.split('\n').slice(1).join('\n');
114}
115
116/**
117 * A generic gRPC client. Primarily useful as a base class for all generated
118 * clients.
119 */
120export class Client {
121 private readonly [CHANNEL_SYMBOL]: Channel;
122 private readonly [INTERCEPTOR_SYMBOL]: Interceptor[];
123 private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[];
124 private readonly [CALL_INVOCATION_TRANSFORMER_SYMBOL]?: CallInvocationTransformer;
125 constructor(
126 address: string,
127 credentials: ChannelCredentials,
128 options: ClientOptions = {}
129 ) {
130 options = Object.assign({}, options);
131 this[INTERCEPTOR_SYMBOL] = options.interceptors ?? [];
132 delete options.interceptors;
133 this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers ?? [];
134 delete options.interceptor_providers;
135 if (
136 this[INTERCEPTOR_SYMBOL].length > 0 &&
137 this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0
138 ) {
139 throw new Error(
140 'Both interceptors and interceptor_providers were passed as options ' +
141 'to the client constructor. Only one of these is allowed.'
142 );
143 }
144 this[CALL_INVOCATION_TRANSFORMER_SYMBOL] =
145 options.callInvocationTransformer;
146 delete options.callInvocationTransformer;
147 if (options.channelOverride) {
148 this[CHANNEL_SYMBOL] = options.channelOverride;
149 } else if (options.channelFactoryOverride) {
150 const channelFactoryOverride = options.channelFactoryOverride;
151 delete options.channelFactoryOverride;
152 this[CHANNEL_SYMBOL] = channelFactoryOverride(
153 address,
154 credentials,
155 options
156 );
157 } else {
158 this[CHANNEL_SYMBOL] = new ChannelImplementation(
159 address,
160 credentials,
161 options
162 );
163 }
164 }
165
166 close(): void {
167 this[CHANNEL_SYMBOL].close();
168 }
169
170 getChannel(): Channel {
171 return this[CHANNEL_SYMBOL];
172 }
173
174 waitForReady(deadline: Deadline, callback: (error?: Error) => void): void {
175 const checkState = (err?: Error) => {
176 if (err) {
177 callback(new Error('Failed to connect before the deadline'));
178 return;
179 }
180 let newState;
181 try {
182 newState = this[CHANNEL_SYMBOL].getConnectivityState(true);
183 } catch (e) {
184 callback(new Error('The channel has been closed'));
185 return;
186 }
187 if (newState === ConnectivityState.READY) {
188 callback();
189 } else {
190 try {
191 this[CHANNEL_SYMBOL].watchConnectivityState(
192 newState,
193 deadline,
194 checkState
195 );
196 } catch (e) {
197 callback(new Error('The channel has been closed'));
198 }
199 }
200 };
201 setImmediate(checkState);
202 }
203
204 private checkOptionalUnaryResponseArguments<ResponseType>(
205 arg1: Metadata | CallOptions | UnaryCallback<ResponseType>,
206 arg2?: CallOptions | UnaryCallback<ResponseType>,
207 arg3?: UnaryCallback<ResponseType>
208 ): {
209 metadata: Metadata;
210 options: CallOptions;
211 callback: UnaryCallback<ResponseType>;
212 } {
213 if (isFunction(arg1)) {
214 return { metadata: new Metadata(), options: {}, callback: arg1 };
215 } else if (isFunction(arg2)) {
216 if (arg1 instanceof Metadata) {
217 return { metadata: arg1, options: {}, callback: arg2 };
218 } else {
219 return { metadata: new Metadata(), options: arg1, callback: arg2 };
220 }
221 } else {
222 if (
223 !(
224 arg1 instanceof Metadata &&
225 arg2 instanceof Object &&
226 isFunction(arg3)
227 )
228 ) {
229 throw new Error('Incorrect arguments passed');
230 }
231 return { metadata: arg1, options: arg2, callback: arg3 };
232 }
233 }
234
235 makeUnaryRequest<RequestType, ResponseType>(
236 method: string,
237 serialize: (value: RequestType) => Buffer,
238 deserialize: (value: Buffer) => ResponseType,
239 argument: RequestType,
240 metadata: Metadata,
241 options: CallOptions,
242 callback: UnaryCallback<ResponseType>
243 ): ClientUnaryCall;
244 makeUnaryRequest<RequestType, ResponseType>(
245 method: string,
246 serialize: (value: RequestType) => Buffer,
247 deserialize: (value: Buffer) => ResponseType,
248 argument: RequestType,
249 metadata: Metadata,
250 callback: UnaryCallback<ResponseType>
251 ): ClientUnaryCall;
252 makeUnaryRequest<RequestType, ResponseType>(
253 method: string,
254 serialize: (value: RequestType) => Buffer,
255 deserialize: (value: Buffer) => ResponseType,
256 argument: RequestType,
257 options: CallOptions,
258 callback: UnaryCallback<ResponseType>
259 ): ClientUnaryCall;
260 makeUnaryRequest<RequestType, ResponseType>(
261 method: string,
262 serialize: (value: RequestType) => Buffer,
263 deserialize: (value: Buffer) => ResponseType,
264 argument: RequestType,
265 callback: UnaryCallback<ResponseType>
266 ): ClientUnaryCall;
267 makeUnaryRequest<RequestType, ResponseType>(
268 method: string,
269 serialize: (value: RequestType) => Buffer,
270 deserialize: (value: Buffer) => ResponseType,
271 argument: RequestType,
272 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
273 options?: CallOptions | UnaryCallback<ResponseType>,
274 callback?: UnaryCallback<ResponseType>
275 ): ClientUnaryCall {
276 const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
277 metadata,
278 options,
279 callback
280 );
281 const methodDefinition: ClientMethodDefinition<
282 RequestType,
283 ResponseType
284 > = {
285 path: method,
286 requestStream: false,
287 responseStream: false,
288 requestSerialize: serialize,
289 responseDeserialize: deserialize,
290 };
291 let callProperties: CallProperties<RequestType, ResponseType> = {
292 argument: argument,
293 metadata: checkedArguments.metadata,
294 call: new ClientUnaryCallImpl(),
295 channel: this[CHANNEL_SYMBOL],
296 methodDefinition: methodDefinition,
297 callOptions: checkedArguments.options,
298 callback: checkedArguments.callback,
299 };
300 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
301 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
302 callProperties
303 ) as CallProperties<RequestType, ResponseType>;
304 }
305 const emitter: ClientUnaryCall = callProperties.call;
306 const interceptorArgs: InterceptorArguments = {
307 clientInterceptors: this[INTERCEPTOR_SYMBOL],
308 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
309 callInterceptors: callProperties.callOptions.interceptors ?? [],
310 callInterceptorProviders:
311 callProperties.callOptions.interceptor_providers ?? [],
312 };
313 const call: InterceptingCallInterface = getInterceptingCall(
314 interceptorArgs,
315 callProperties.methodDefinition,
316 callProperties.callOptions,
317 callProperties.channel
318 );
319 /* This needs to happen before the emitter is used. Unfortunately we can't
320 * enforce this with the type system. We need to construct this emitter
321 * before calling the CallInvocationTransformer, and we need to create the
322 * call after that. */
323 emitter.call = call;
324 let responseMessage: ResponseType | null = null;
325 let receivedStatus = false;
326 let callerStackError: Error | null = new Error();
327 call.start(callProperties.metadata, {
328 onReceiveMetadata: (metadata) => {
329 emitter.emit('metadata', metadata);
330 },
331 // eslint-disable-next-line @typescript-eslint/no-explicit-any
332 onReceiveMessage(message: any) {
333 if (responseMessage !== null) {
334 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
335 }
336 responseMessage = message;
337 },
338 onReceiveStatus(status: StatusObject) {
339 if (receivedStatus) {
340 return;
341 }
342 receivedStatus = true;
343 if (status.code === Status.OK) {
344 if (responseMessage === null) {
345 const callerStack = getErrorStackString(callerStackError!);
346 callProperties.callback!(callErrorFromStatus({
347 code: Status.INTERNAL,
348 details: 'No message received',
349 metadata: status.metadata
350 }, /*callerStack*/''));
351 } else {
352 callProperties.callback!(null, responseMessage);
353 }
354 } else {
355 const callerStack = getErrorStackString(callerStackError!);
356 callProperties.callback!(callErrorFromStatus(status, /*callerStack*/''));
357 }
358 /* Avoid retaining the callerStackError object in the call context of
359 * the status event handler. */
360 callerStackError = null;
361 emitter.emit('status', status);
362 },
363 });
364 call.sendMessage(argument);
365 call.halfClose();
366 return emitter;
367 }
368
369 makeClientStreamRequest<RequestType, ResponseType>(
370 method: string,
371 serialize: (value: RequestType) => Buffer,
372 deserialize: (value: Buffer) => ResponseType,
373 metadata: Metadata,
374 options: CallOptions,
375 callback: UnaryCallback<ResponseType>
376 ): ClientWritableStream<RequestType>;
377 makeClientStreamRequest<RequestType, ResponseType>(
378 method: string,
379 serialize: (value: RequestType) => Buffer,
380 deserialize: (value: Buffer) => ResponseType,
381 metadata: Metadata,
382 callback: UnaryCallback<ResponseType>
383 ): ClientWritableStream<RequestType>;
384 makeClientStreamRequest<RequestType, ResponseType>(
385 method: string,
386 serialize: (value: RequestType) => Buffer,
387 deserialize: (value: Buffer) => ResponseType,
388 options: CallOptions,
389 callback: UnaryCallback<ResponseType>
390 ): ClientWritableStream<RequestType>;
391 makeClientStreamRequest<RequestType, ResponseType>(
392 method: string,
393 serialize: (value: RequestType) => Buffer,
394 deserialize: (value: Buffer) => ResponseType,
395 callback: UnaryCallback<ResponseType>
396 ): ClientWritableStream<RequestType>;
397 makeClientStreamRequest<RequestType, ResponseType>(
398 method: string,
399 serialize: (value: RequestType) => Buffer,
400 deserialize: (value: Buffer) => ResponseType,
401 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
402 options?: CallOptions | UnaryCallback<ResponseType>,
403 callback?: UnaryCallback<ResponseType>
404 ): ClientWritableStream<RequestType> {
405 const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
406 metadata,
407 options,
408 callback
409 );
410 const methodDefinition: ClientMethodDefinition<
411 RequestType,
412 ResponseType
413 > = {
414 path: method,
415 requestStream: true,
416 responseStream: false,
417 requestSerialize: serialize,
418 responseDeserialize: deserialize,
419 };
420 let callProperties: CallProperties<RequestType, ResponseType> = {
421 metadata: checkedArguments.metadata,
422 call: new ClientWritableStreamImpl<RequestType>(serialize),
423 channel: this[CHANNEL_SYMBOL],
424 methodDefinition: methodDefinition,
425 callOptions: checkedArguments.options,
426 callback: checkedArguments.callback,
427 };
428 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
429 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
430 callProperties
431 ) as CallProperties<RequestType, ResponseType>;
432 }
433 const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>;
434 const interceptorArgs: InterceptorArguments = {
435 clientInterceptors: this[INTERCEPTOR_SYMBOL],
436 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
437 callInterceptors: callProperties.callOptions.interceptors ?? [],
438 callInterceptorProviders:
439 callProperties.callOptions.interceptor_providers ?? [],
440 };
441 const call: InterceptingCallInterface = getInterceptingCall(
442 interceptorArgs,
443 callProperties.methodDefinition,
444 callProperties.callOptions,
445 callProperties.channel
446 );
447 /* This needs to happen before the emitter is used. Unfortunately we can't
448 * enforce this with the type system. We need to construct this emitter
449 * before calling the CallInvocationTransformer, and we need to create the
450 * call after that. */
451 emitter.call = call;
452 let responseMessage: ResponseType | null = null;
453 let receivedStatus = false;
454 let callerStackError: Error | null = new Error();
455 call.start(callProperties.metadata, {
456 onReceiveMetadata: (metadata) => {
457 emitter.emit('metadata', metadata);
458 },
459 // eslint-disable-next-line @typescript-eslint/no-explicit-any
460 onReceiveMessage(message: any) {
461 if (responseMessage !== null) {
462 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
463 }
464 responseMessage = message;
465 },
466 onReceiveStatus(status: StatusObject) {
467 if (receivedStatus) {
468 return;
469 }
470 receivedStatus = true;
471 if (status.code === Status.OK) {
472 if (responseMessage === null) {
473 const callerStack = getErrorStackString(callerStackError!);
474 callProperties.callback!(callErrorFromStatus({
475 code: Status.INTERNAL,
476 details: 'No message received',
477 metadata: status.metadata
478 }, callerStack));
479 } else {
480 callProperties.callback!(null, responseMessage);
481 }
482 } else {
483 const callerStack = getErrorStackString(callerStackError!);
484 callProperties.callback!(callErrorFromStatus(status, callerStack));
485 }
486 /* Avoid retaining the callerStackError object in the call context of
487 * the status event handler. */
488 callerStackError = null;
489 emitter.emit('status', status);
490 },
491 });
492 return emitter;
493 }
494
495 private checkMetadataAndOptions(
496 arg1?: Metadata | CallOptions,
497 arg2?: CallOptions
498 ): { metadata: Metadata; options: CallOptions } {
499 let metadata: Metadata;
500 let options: CallOptions;
501 if (arg1 instanceof Metadata) {
502 metadata = arg1;
503 if (arg2) {
504 options = arg2;
505 } else {
506 options = {};
507 }
508 } else {
509 if (arg1) {
510 options = arg1;
511 } else {
512 options = {};
513 }
514 metadata = new Metadata();
515 }
516 return { metadata, options };
517 }
518
519 makeServerStreamRequest<RequestType, ResponseType>(
520 method: string,
521 serialize: (value: RequestType) => Buffer,
522 deserialize: (value: Buffer) => ResponseType,
523 argument: RequestType,
524 metadata: Metadata,
525 options?: CallOptions
526 ): ClientReadableStream<ResponseType>;
527 makeServerStreamRequest<RequestType, ResponseType>(
528 method: string,
529 serialize: (value: RequestType) => Buffer,
530 deserialize: (value: Buffer) => ResponseType,
531 argument: RequestType,
532 options?: CallOptions
533 ): ClientReadableStream<ResponseType>;
534 makeServerStreamRequest<RequestType, ResponseType>(
535 method: string,
536 serialize: (value: RequestType) => Buffer,
537 deserialize: (value: Buffer) => ResponseType,
538 argument: RequestType,
539 metadata?: Metadata | CallOptions,
540 options?: CallOptions
541 ): ClientReadableStream<ResponseType> {
542 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
543 const methodDefinition: ClientMethodDefinition<
544 RequestType,
545 ResponseType
546 > = {
547 path: method,
548 requestStream: false,
549 responseStream: true,
550 requestSerialize: serialize,
551 responseDeserialize: deserialize,
552 };
553 let callProperties: CallProperties<RequestType, ResponseType> = {
554 argument: argument,
555 metadata: checkedArguments.metadata,
556 call: new ClientReadableStreamImpl<ResponseType>(deserialize),
557 channel: this[CHANNEL_SYMBOL],
558 methodDefinition: methodDefinition,
559 callOptions: checkedArguments.options,
560 };
561 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
562 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
563 callProperties
564 ) as CallProperties<RequestType, ResponseType>;
565 }
566 const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>;
567 const interceptorArgs: InterceptorArguments = {
568 clientInterceptors: this[INTERCEPTOR_SYMBOL],
569 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
570 callInterceptors: callProperties.callOptions.interceptors ?? [],
571 callInterceptorProviders:
572 callProperties.callOptions.interceptor_providers ?? [],
573 };
574 const call: InterceptingCallInterface = getInterceptingCall(
575 interceptorArgs,
576 callProperties.methodDefinition,
577 callProperties.callOptions,
578 callProperties.channel
579 );
580 /* This needs to happen before the emitter is used. Unfortunately we can't
581 * enforce this with the type system. We need to construct this emitter
582 * before calling the CallInvocationTransformer, and we need to create the
583 * call after that. */
584 stream.call = call;
585 let receivedStatus = false;
586 let callerStackError: Error | null = new Error();
587 call.start(callProperties.metadata, {
588 onReceiveMetadata(metadata: Metadata) {
589 stream.emit('metadata', metadata);
590 },
591 // eslint-disable-next-line @typescript-eslint/no-explicit-any
592 onReceiveMessage(message: any) {
593 stream.push(message);
594 },
595 onReceiveStatus(status: StatusObject) {
596 if (receivedStatus) {
597 return;
598 }
599 receivedStatus = true;
600 stream.push(null);
601 if (status.code !== Status.OK) {
602 const callerStack = getErrorStackString(callerStackError!);
603 stream.emit('error', callErrorFromStatus(status, callerStack));
604 }
605 /* Avoid retaining the callerStackError object in the call context of
606 * the status event handler. */
607 callerStackError = null;
608 stream.emit('status', status);
609 },
610 });
611 call.sendMessage(argument);
612 call.halfClose();
613 return stream;
614 }
615
616 makeBidiStreamRequest<RequestType, ResponseType>(
617 method: string,
618 serialize: (value: RequestType) => Buffer,
619 deserialize: (value: Buffer) => ResponseType,
620 metadata: Metadata,
621 options?: CallOptions
622 ): ClientDuplexStream<RequestType, ResponseType>;
623 makeBidiStreamRequest<RequestType, ResponseType>(
624 method: string,
625 serialize: (value: RequestType) => Buffer,
626 deserialize: (value: Buffer) => ResponseType,
627 options?: CallOptions
628 ): ClientDuplexStream<RequestType, ResponseType>;
629 makeBidiStreamRequest<RequestType, ResponseType>(
630 method: string,
631 serialize: (value: RequestType) => Buffer,
632 deserialize: (value: Buffer) => ResponseType,
633 metadata?: Metadata | CallOptions,
634 options?: CallOptions
635 ): ClientDuplexStream<RequestType, ResponseType> {
636 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
637 const methodDefinition: ClientMethodDefinition<
638 RequestType,
639 ResponseType
640 > = {
641 path: method,
642 requestStream: true,
643 responseStream: true,
644 requestSerialize: serialize,
645 responseDeserialize: deserialize,
646 };
647 let callProperties: CallProperties<RequestType, ResponseType> = {
648 metadata: checkedArguments.metadata,
649 call: new ClientDuplexStreamImpl<RequestType, ResponseType>(
650 serialize,
651 deserialize
652 ),
653 channel: this[CHANNEL_SYMBOL],
654 methodDefinition: methodDefinition,
655 callOptions: checkedArguments.options,
656 };
657 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
658 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
659 callProperties
660 ) as CallProperties<RequestType, ResponseType>;
661 }
662 const stream: ClientDuplexStream<
663 RequestType,
664 ResponseType
665 > = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
666 const interceptorArgs: InterceptorArguments = {
667 clientInterceptors: this[INTERCEPTOR_SYMBOL],
668 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
669 callInterceptors: callProperties.callOptions.interceptors ?? [],
670 callInterceptorProviders:
671 callProperties.callOptions.interceptor_providers ?? [],
672 };
673 const call: InterceptingCallInterface = getInterceptingCall(
674 interceptorArgs,
675 callProperties.methodDefinition,
676 callProperties.callOptions,
677 callProperties.channel
678 );
679 /* This needs to happen before the emitter is used. Unfortunately we can't
680 * enforce this with the type system. We need to construct this emitter
681 * before calling the CallInvocationTransformer, and we need to create the
682 * call after that. */
683 stream.call = call;
684 let receivedStatus = false;
685 let callerStackError: Error | null = new Error();
686 call.start(callProperties.metadata, {
687 onReceiveMetadata(metadata: Metadata) {
688 stream.emit('metadata', metadata);
689 },
690 onReceiveMessage(message: Buffer) {
691 stream.push(message);
692 },
693 onReceiveStatus(status: StatusObject) {
694 if (receivedStatus) {
695 return;
696 }
697 receivedStatus = true;
698 stream.push(null);
699 if (status.code !== Status.OK) {
700 const callerStack = getErrorStackString(callerStackError!);
701 stream.emit('error', callErrorFromStatus(status, callerStack));
702 }
703 /* Avoid retaining the callerStackError object in the call context of
704 * the status event handler. */
705 callerStackError = null;
706 stream.emit('status', status);
707 },
708 });
709 return stream;
710 }
711}