UNPKG

24.2 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import {
19 ClientDuplexStream,
20 ClientDuplexStreamImpl,
21 ClientReadableStream,
22 ClientReadableStreamImpl,
23 ClientUnaryCall,
24 ClientUnaryCallImpl,
25 ClientWritableStream,
26 ClientWritableStreamImpl,
27 ServiceError,
28 callErrorFromStatus,
29 SurfaceCall,
30} from './call';
31import { CallCredentials } from './call-credentials';
32import { StatusObject } from './call-interface';
33import { Channel, ChannelImplementation } from './channel';
34import { ConnectivityState } from './connectivity-state';
35import { ChannelCredentials } from './channel-credentials';
36import { ChannelOptions } from './channel-options';
37import { Status } from './constants';
38import { Metadata } from './metadata';
39import { ClientMethodDefinition } from './make-client';
40import {
41 getInterceptingCall,
42 Interceptor,
43 InterceptorProvider,
44 InterceptorArguments,
45 InterceptingCallInterface,
46} from './client-interceptors';
47import {
48 ServerUnaryCall,
49 ServerReadableStream,
50 ServerWritableStream,
51 ServerDuplexStream,
52} from './server-call';
53import { Deadline } from './deadline';
54
55const CHANNEL_SYMBOL = Symbol();
56const INTERCEPTOR_SYMBOL = Symbol();
57const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
58const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol();
59
60function isFunction<ResponseType>(
61 arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined
62): arg is UnaryCallback<ResponseType> {
63 return typeof arg === 'function';
64}
65
66export interface UnaryCallback<ResponseType> {
67 (err: ServiceError | null, value?: ResponseType): void;
68}
69
70/* eslint-disable @typescript-eslint/no-explicit-any */
71export interface CallOptions {
72 deadline?: Deadline;
73 host?: string;
74 parent?:
75 | ServerUnaryCall<any, any>
76 | ServerReadableStream<any, any>
77 | ServerWritableStream<any, any>
78 | ServerDuplexStream<any, any>;
79 propagate_flags?: number;
80 credentials?: CallCredentials;
81 interceptors?: Interceptor[];
82 interceptor_providers?: InterceptorProvider[];
83}
84/* eslint-enable @typescript-eslint/no-explicit-any */
85
86export interface CallProperties<RequestType, ResponseType> {
87 argument?: RequestType;
88 metadata: Metadata;
89 call: SurfaceCall;
90 channel: Channel;
91 methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
92 callOptions: CallOptions;
93 callback?: UnaryCallback<ResponseType>;
94}
95
96export interface CallInvocationTransformer {
97 (callProperties: CallProperties<any, any>): CallProperties<any, any>; // eslint-disable-line @typescript-eslint/no-explicit-any
98}
99
100export type ClientOptions = Partial<ChannelOptions> & {
101 channelOverride?: Channel;
102 channelFactoryOverride?: (
103 address: string,
104 credentials: ChannelCredentials,
105 options: ClientOptions
106 ) => Channel;
107 interceptors?: Interceptor[];
108 interceptor_providers?: InterceptorProvider[];
109 callInvocationTransformer?: CallInvocationTransformer;
110};
111
112function getErrorStackString(error: Error): string {
113 return error.stack!.split('\n').slice(1).join('\n');
114}
115
116/**
117 * A generic gRPC client. Primarily useful as a base class for all generated
118 * clients.
119 */
120export class Client {
121 private readonly [CHANNEL_SYMBOL]: Channel;
122 private readonly [INTERCEPTOR_SYMBOL]: Interceptor[];
123 private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[];
124 private readonly [CALL_INVOCATION_TRANSFORMER_SYMBOL]?: CallInvocationTransformer;
125 constructor(
126 address: string,
127 credentials: ChannelCredentials,
128 options: ClientOptions = {}
129 ) {
130 options = Object.assign({}, options);
131 this[INTERCEPTOR_SYMBOL] = options.interceptors ?? [];
132 delete options.interceptors;
133 this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers ?? [];
134 delete options.interceptor_providers;
135 if (
136 this[INTERCEPTOR_SYMBOL].length > 0 &&
137 this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0
138 ) {
139 throw new Error(
140 'Both interceptors and interceptor_providers were passed as options ' +
141 'to the client constructor. Only one of these is allowed.'
142 );
143 }
144 this[CALL_INVOCATION_TRANSFORMER_SYMBOL] =
145 options.callInvocationTransformer;
146 delete options.callInvocationTransformer;
147 if (options.channelOverride) {
148 this[CHANNEL_SYMBOL] = options.channelOverride;
149 } else if (options.channelFactoryOverride) {
150 const channelFactoryOverride = options.channelFactoryOverride;
151 delete options.channelFactoryOverride;
152 this[CHANNEL_SYMBOL] = channelFactoryOverride(
153 address,
154 credentials,
155 options
156 );
157 } else {
158 this[CHANNEL_SYMBOL] = new ChannelImplementation(
159 address,
160 credentials,
161 options
162 );
163 }
164 }
165
166 close(): void {
167 this[CHANNEL_SYMBOL].close();
168 }
169
170 getChannel(): Channel {
171 return this[CHANNEL_SYMBOL];
172 }
173
174 waitForReady(deadline: Deadline, callback: (error?: Error) => void): void {
175 const checkState = (err?: Error) => {
176 if (err) {
177 callback(new Error('Failed to connect before the deadline'));
178 return;
179 }
180 let newState;
181 try {
182 newState = this[CHANNEL_SYMBOL].getConnectivityState(true);
183 } catch (e) {
184 callback(new Error('The channel has been closed'));
185 return;
186 }
187 if (newState === ConnectivityState.READY) {
188 callback();
189 } else {
190 try {
191 this[CHANNEL_SYMBOL].watchConnectivityState(
192 newState,
193 deadline,
194 checkState
195 );
196 } catch (e) {
197 callback(new Error('The channel has been closed'));
198 }
199 }
200 };
201 setImmediate(checkState);
202 }
203
204 private checkOptionalUnaryResponseArguments<ResponseType>(
205 arg1: Metadata | CallOptions | UnaryCallback<ResponseType>,
206 arg2?: CallOptions | UnaryCallback<ResponseType>,
207 arg3?: UnaryCallback<ResponseType>
208 ): {
209 metadata: Metadata;
210 options: CallOptions;
211 callback: UnaryCallback<ResponseType>;
212 } {
213 if (isFunction(arg1)) {
214 return { metadata: new Metadata(), options: {}, callback: arg1 };
215 } else if (isFunction(arg2)) {
216 if (arg1 instanceof Metadata) {
217 return { metadata: arg1, options: {}, callback: arg2 };
218 } else {
219 return { metadata: new Metadata(), options: arg1, callback: arg2 };
220 }
221 } else {
222 if (
223 !(
224 arg1 instanceof Metadata &&
225 arg2 instanceof Object &&
226 isFunction(arg3)
227 )
228 ) {
229 throw new Error('Incorrect arguments passed');
230 }
231 return { metadata: arg1, options: arg2, callback: arg3 };
232 }
233 }
234
235 makeUnaryRequest<RequestType, ResponseType>(
236 method: string,
237 serialize: (value: RequestType) => Buffer,
238 deserialize: (value: Buffer) => ResponseType,
239 argument: RequestType,
240 metadata: Metadata,
241 options: CallOptions,
242 callback: UnaryCallback<ResponseType>
243 ): ClientUnaryCall;
244 makeUnaryRequest<RequestType, ResponseType>(
245 method: string,
246 serialize: (value: RequestType) => Buffer,
247 deserialize: (value: Buffer) => ResponseType,
248 argument: RequestType,
249 metadata: Metadata,
250 callback: UnaryCallback<ResponseType>
251 ): ClientUnaryCall;
252 makeUnaryRequest<RequestType, ResponseType>(
253 method: string,
254 serialize: (value: RequestType) => Buffer,
255 deserialize: (value: Buffer) => ResponseType,
256 argument: RequestType,
257 options: CallOptions,
258 callback: UnaryCallback<ResponseType>
259 ): ClientUnaryCall;
260 makeUnaryRequest<RequestType, ResponseType>(
261 method: string,
262 serialize: (value: RequestType) => Buffer,
263 deserialize: (value: Buffer) => ResponseType,
264 argument: RequestType,
265 callback: UnaryCallback<ResponseType>
266 ): ClientUnaryCall;
267 makeUnaryRequest<RequestType, ResponseType>(
268 method: string,
269 serialize: (value: RequestType) => Buffer,
270 deserialize: (value: Buffer) => ResponseType,
271 argument: RequestType,
272 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
273 options?: CallOptions | UnaryCallback<ResponseType>,
274 callback?: UnaryCallback<ResponseType>
275 ): ClientUnaryCall {
276 const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
277 metadata,
278 options,
279 callback
280 );
281 const methodDefinition: ClientMethodDefinition<
282 RequestType,
283 ResponseType
284 > = {
285 path: method,
286 requestStream: false,
287 responseStream: false,
288 requestSerialize: serialize,
289 responseDeserialize: deserialize,
290 };
291 let callProperties: CallProperties<RequestType, ResponseType> = {
292 argument: argument,
293 metadata: checkedArguments.metadata,
294 call: new ClientUnaryCallImpl(),
295 channel: this[CHANNEL_SYMBOL],
296 methodDefinition: methodDefinition,
297 callOptions: checkedArguments.options,
298 callback: checkedArguments.callback,
299 };
300 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
301 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
302 callProperties
303 ) as CallProperties<RequestType, ResponseType>;
304 }
305 const emitter: ClientUnaryCall = callProperties.call;
306 const interceptorArgs: InterceptorArguments = {
307 clientInterceptors: this[INTERCEPTOR_SYMBOL],
308 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
309 callInterceptors: callProperties.callOptions.interceptors ?? [],
310 callInterceptorProviders:
311 callProperties.callOptions.interceptor_providers ?? [],
312 };
313 const call: InterceptingCallInterface = getInterceptingCall(
314 interceptorArgs,
315 callProperties.methodDefinition,
316 callProperties.callOptions,
317 callProperties.channel
318 );
319 /* This needs to happen before the emitter is used. Unfortunately we can't
320 * enforce this with the type system. We need to construct this emitter
321 * before calling the CallInvocationTransformer, and we need to create the
322 * call after that. */
323 emitter.call = call;
324 let responseMessage: ResponseType | null = null;
325 let receivedStatus = false;
326 const callerStackError = new Error();
327 call.start(callProperties.metadata, {
328 onReceiveMetadata: (metadata) => {
329 emitter.emit('metadata', metadata);
330 },
331 // eslint-disable-next-line @typescript-eslint/no-explicit-any
332 onReceiveMessage(message: any) {
333 if (responseMessage !== null) {
334 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
335 }
336 responseMessage = message;
337 },
338 onReceiveStatus(status: StatusObject) {
339 if (receivedStatus) {
340 return;
341 }
342 receivedStatus = true;
343 if (status.code === Status.OK) {
344 if (responseMessage === null) {
345 const callerStack = getErrorStackString(callerStackError);
346 callProperties.callback!(callErrorFromStatus({
347 code: Status.INTERNAL,
348 details: 'No message received',
349 metadata: status.metadata
350 }, callerStack));
351 } else {
352 callProperties.callback!(null, responseMessage);
353 }
354 } else {
355 const callerStack = getErrorStackString(callerStackError);
356 callProperties.callback!(callErrorFromStatus(status, callerStack));
357 }
358 emitter.emit('status', status);
359 },
360 });
361 call.sendMessage(argument);
362 call.halfClose();
363 return emitter;
364 }
365
366 makeClientStreamRequest<RequestType, ResponseType>(
367 method: string,
368 serialize: (value: RequestType) => Buffer,
369 deserialize: (value: Buffer) => ResponseType,
370 metadata: Metadata,
371 options: CallOptions,
372 callback: UnaryCallback<ResponseType>
373 ): ClientWritableStream<RequestType>;
374 makeClientStreamRequest<RequestType, ResponseType>(
375 method: string,
376 serialize: (value: RequestType) => Buffer,
377 deserialize: (value: Buffer) => ResponseType,
378 metadata: Metadata,
379 callback: UnaryCallback<ResponseType>
380 ): ClientWritableStream<RequestType>;
381 makeClientStreamRequest<RequestType, ResponseType>(
382 method: string,
383 serialize: (value: RequestType) => Buffer,
384 deserialize: (value: Buffer) => ResponseType,
385 options: CallOptions,
386 callback: UnaryCallback<ResponseType>
387 ): ClientWritableStream<RequestType>;
388 makeClientStreamRequest<RequestType, ResponseType>(
389 method: string,
390 serialize: (value: RequestType) => Buffer,
391 deserialize: (value: Buffer) => ResponseType,
392 callback: UnaryCallback<ResponseType>
393 ): ClientWritableStream<RequestType>;
394 makeClientStreamRequest<RequestType, ResponseType>(
395 method: string,
396 serialize: (value: RequestType) => Buffer,
397 deserialize: (value: Buffer) => ResponseType,
398 metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
399 options?: CallOptions | UnaryCallback<ResponseType>,
400 callback?: UnaryCallback<ResponseType>
401 ): ClientWritableStream<RequestType> {
402 const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
403 metadata,
404 options,
405 callback
406 );
407 const methodDefinition: ClientMethodDefinition<
408 RequestType,
409 ResponseType
410 > = {
411 path: method,
412 requestStream: true,
413 responseStream: false,
414 requestSerialize: serialize,
415 responseDeserialize: deserialize,
416 };
417 let callProperties: CallProperties<RequestType, ResponseType> = {
418 metadata: checkedArguments.metadata,
419 call: new ClientWritableStreamImpl<RequestType>(serialize),
420 channel: this[CHANNEL_SYMBOL],
421 methodDefinition: methodDefinition,
422 callOptions: checkedArguments.options,
423 callback: checkedArguments.callback,
424 };
425 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
426 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
427 callProperties
428 ) as CallProperties<RequestType, ResponseType>;
429 }
430 const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>;
431 const interceptorArgs: InterceptorArguments = {
432 clientInterceptors: this[INTERCEPTOR_SYMBOL],
433 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
434 callInterceptors: callProperties.callOptions.interceptors ?? [],
435 callInterceptorProviders:
436 callProperties.callOptions.interceptor_providers ?? [],
437 };
438 const call: InterceptingCallInterface = getInterceptingCall(
439 interceptorArgs,
440 callProperties.methodDefinition,
441 callProperties.callOptions,
442 callProperties.channel
443 );
444 /* This needs to happen before the emitter is used. Unfortunately we can't
445 * enforce this with the type system. We need to construct this emitter
446 * before calling the CallInvocationTransformer, and we need to create the
447 * call after that. */
448 emitter.call = call;
449 let responseMessage: ResponseType | null = null;
450 let receivedStatus = false;
451 const callerStackError = new Error();
452 call.start(callProperties.metadata, {
453 onReceiveMetadata: (metadata) => {
454 emitter.emit('metadata', metadata);
455 },
456 // eslint-disable-next-line @typescript-eslint/no-explicit-any
457 onReceiveMessage(message: any) {
458 if (responseMessage !== null) {
459 call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
460 }
461 responseMessage = message;
462 },
463 onReceiveStatus(status: StatusObject) {
464 if (receivedStatus) {
465 return;
466 }
467 receivedStatus = true;
468 if (status.code === Status.OK) {
469 if (responseMessage === null) {
470 const callerStack = getErrorStackString(callerStackError);
471 callProperties.callback!(callErrorFromStatus({
472 code: Status.INTERNAL,
473 details: 'No message received',
474 metadata: status.metadata
475 }, callerStack));
476 } else {
477 callProperties.callback!(null, responseMessage);
478 }
479 } else {
480 const callerStack = getErrorStackString(callerStackError);
481 callProperties.callback!(callErrorFromStatus(status, callerStack));
482 }
483 emitter.emit('status', status);
484 },
485 });
486 return emitter;
487 }
488
489 private checkMetadataAndOptions(
490 arg1?: Metadata | CallOptions,
491 arg2?: CallOptions
492 ): { metadata: Metadata; options: CallOptions } {
493 let metadata: Metadata;
494 let options: CallOptions;
495 if (arg1 instanceof Metadata) {
496 metadata = arg1;
497 if (arg2) {
498 options = arg2;
499 } else {
500 options = {};
501 }
502 } else {
503 if (arg1) {
504 options = arg1;
505 } else {
506 options = {};
507 }
508 metadata = new Metadata();
509 }
510 return { metadata, options };
511 }
512
513 makeServerStreamRequest<RequestType, ResponseType>(
514 method: string,
515 serialize: (value: RequestType) => Buffer,
516 deserialize: (value: Buffer) => ResponseType,
517 argument: RequestType,
518 metadata: Metadata,
519 options?: CallOptions
520 ): ClientReadableStream<ResponseType>;
521 makeServerStreamRequest<RequestType, ResponseType>(
522 method: string,
523 serialize: (value: RequestType) => Buffer,
524 deserialize: (value: Buffer) => ResponseType,
525 argument: RequestType,
526 options?: CallOptions
527 ): ClientReadableStream<ResponseType>;
528 makeServerStreamRequest<RequestType, ResponseType>(
529 method: string,
530 serialize: (value: RequestType) => Buffer,
531 deserialize: (value: Buffer) => ResponseType,
532 argument: RequestType,
533 metadata?: Metadata | CallOptions,
534 options?: CallOptions
535 ): ClientReadableStream<ResponseType> {
536 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
537 const methodDefinition: ClientMethodDefinition<
538 RequestType,
539 ResponseType
540 > = {
541 path: method,
542 requestStream: false,
543 responseStream: true,
544 requestSerialize: serialize,
545 responseDeserialize: deserialize,
546 };
547 let callProperties: CallProperties<RequestType, ResponseType> = {
548 argument: argument,
549 metadata: checkedArguments.metadata,
550 call: new ClientReadableStreamImpl<ResponseType>(deserialize),
551 channel: this[CHANNEL_SYMBOL],
552 methodDefinition: methodDefinition,
553 callOptions: checkedArguments.options,
554 };
555 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
556 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
557 callProperties
558 ) as CallProperties<RequestType, ResponseType>;
559 }
560 const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>;
561 const interceptorArgs: InterceptorArguments = {
562 clientInterceptors: this[INTERCEPTOR_SYMBOL],
563 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
564 callInterceptors: callProperties.callOptions.interceptors ?? [],
565 callInterceptorProviders:
566 callProperties.callOptions.interceptor_providers ?? [],
567 };
568 const call: InterceptingCallInterface = getInterceptingCall(
569 interceptorArgs,
570 callProperties.methodDefinition,
571 callProperties.callOptions,
572 callProperties.channel
573 );
574 /* This needs to happen before the emitter is used. Unfortunately we can't
575 * enforce this with the type system. We need to construct this emitter
576 * before calling the CallInvocationTransformer, and we need to create the
577 * call after that. */
578 stream.call = call;
579 let receivedStatus = false;
580 const callerStackError = new Error();
581 call.start(callProperties.metadata, {
582 onReceiveMetadata(metadata: Metadata) {
583 stream.emit('metadata', metadata);
584 },
585 // eslint-disable-next-line @typescript-eslint/no-explicit-any
586 onReceiveMessage(message: any) {
587 stream.push(message);
588 },
589 onReceiveStatus(status: StatusObject) {
590 if (receivedStatus) {
591 return;
592 }
593 receivedStatus = true;
594 stream.push(null);
595 if (status.code !== Status.OK) {
596 const callerStack = getErrorStackString(callerStackError);
597 stream.emit('error', callErrorFromStatus(status, callerStack));
598 }
599 stream.emit('status', status);
600 },
601 });
602 call.sendMessage(argument);
603 call.halfClose();
604 return stream;
605 }
606
607 makeBidiStreamRequest<RequestType, ResponseType>(
608 method: string,
609 serialize: (value: RequestType) => Buffer,
610 deserialize: (value: Buffer) => ResponseType,
611 metadata: Metadata,
612 options?: CallOptions
613 ): ClientDuplexStream<RequestType, ResponseType>;
614 makeBidiStreamRequest<RequestType, ResponseType>(
615 method: string,
616 serialize: (value: RequestType) => Buffer,
617 deserialize: (value: Buffer) => ResponseType,
618 options?: CallOptions
619 ): ClientDuplexStream<RequestType, ResponseType>;
620 makeBidiStreamRequest<RequestType, ResponseType>(
621 method: string,
622 serialize: (value: RequestType) => Buffer,
623 deserialize: (value: Buffer) => ResponseType,
624 metadata?: Metadata | CallOptions,
625 options?: CallOptions
626 ): ClientDuplexStream<RequestType, ResponseType> {
627 const checkedArguments = this.checkMetadataAndOptions(metadata, options);
628 const methodDefinition: ClientMethodDefinition<
629 RequestType,
630 ResponseType
631 > = {
632 path: method,
633 requestStream: true,
634 responseStream: true,
635 requestSerialize: serialize,
636 responseDeserialize: deserialize,
637 };
638 let callProperties: CallProperties<RequestType, ResponseType> = {
639 metadata: checkedArguments.metadata,
640 call: new ClientDuplexStreamImpl<RequestType, ResponseType>(
641 serialize,
642 deserialize
643 ),
644 channel: this[CHANNEL_SYMBOL],
645 methodDefinition: methodDefinition,
646 callOptions: checkedArguments.options,
647 };
648 if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
649 callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
650 callProperties
651 ) as CallProperties<RequestType, ResponseType>;
652 }
653 const stream: ClientDuplexStream<
654 RequestType,
655 ResponseType
656 > = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
657 const interceptorArgs: InterceptorArguments = {
658 clientInterceptors: this[INTERCEPTOR_SYMBOL],
659 clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
660 callInterceptors: callProperties.callOptions.interceptors ?? [],
661 callInterceptorProviders:
662 callProperties.callOptions.interceptor_providers ?? [],
663 };
664 const call: InterceptingCallInterface = getInterceptingCall(
665 interceptorArgs,
666 callProperties.methodDefinition,
667 callProperties.callOptions,
668 callProperties.channel
669 );
670 /* This needs to happen before the emitter is used. Unfortunately we can't
671 * enforce this with the type system. We need to construct this emitter
672 * before calling the CallInvocationTransformer, and we need to create the
673 * call after that. */
674 stream.call = call;
675 let receivedStatus = false;
676 const callerStackError = new Error();
677 call.start(callProperties.metadata, {
678 onReceiveMetadata(metadata: Metadata) {
679 stream.emit('metadata', metadata);
680 },
681 onReceiveMessage(message: Buffer) {
682 stream.push(message);
683 },
684 onReceiveStatus(status: StatusObject) {
685 if (receivedStatus) {
686 return;
687 }
688 receivedStatus = true;
689 stream.push(null);
690 if (status.code !== Status.OK) {
691 const callerStack = getErrorStackString(callerStackError);
692 stream.emit('error', callErrorFromStatus(status, callerStack));
693 }
694 stream.emit('status', status);
695 },
696 });
697 return stream;
698 }
699}