UNPKG

5.45 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { EventEmitter } from 'events';
19import { Duplex, Readable, Writable } from 'stream';
20
21import { StatusObject, MessageContext } from './call-interface';
22import { Status } from './constants';
23import { EmitterAugmentation1 } from './events';
24import { Metadata } from './metadata';
25import { ObjectReadable, ObjectWritable, WriteCallback } from './object-stream';
26import { InterceptingCallInterface } from './client-interceptors';
27
28/**
29 * A type extending the built-in Error object with additional fields.
30 */
31export type ServiceError = StatusObject & Error;
32
33/**
34 * A base type for all user-facing values returned by client-side method calls.
35 */
36export type SurfaceCall = {
37 call?: InterceptingCallInterface;
38 cancel(): void;
39 getPeer(): string;
40} & EmitterAugmentation1<'metadata', Metadata> &
41 EmitterAugmentation1<'status', StatusObject> &
42 EventEmitter;
43
44/**
45 * A type representing the return value of a unary method call.
46 */
47export type ClientUnaryCall = SurfaceCall;
48
49/**
50 * A type representing the return value of a server stream method call.
51 */
52export type ClientReadableStream<ResponseType> = {
53 deserialize: (chunk: Buffer) => ResponseType;
54} & SurfaceCall &
55 ObjectReadable<ResponseType>;
56
57/**
58 * A type representing the return value of a client stream method call.
59 */
60export type ClientWritableStream<RequestType> = {
61 serialize: (value: RequestType) => Buffer;
62} & SurfaceCall &
63 ObjectWritable<RequestType>;
64
65/**
66 * A type representing the return value of a bidirectional stream method call.
67 */
68export type ClientDuplexStream<
69 RequestType,
70 ResponseType
71> = ClientWritableStream<RequestType> & ClientReadableStream<ResponseType>;
72
73/**
74 * Construct a ServiceError from a StatusObject. This function exists primarily
75 * as an attempt to make the error stack trace clearly communicate that the
76 * error is not necessarily a problem in gRPC itself.
77 * @param status
78 */
79export function callErrorFromStatus(status: StatusObject, callerStack: string): ServiceError {
80 const message = `${status.code} ${Status[status.code]}: ${status.details}`;
81 const error = new Error(message);
82 const stack = `${error.stack}\nfor call at\n${callerStack}`;
83 return Object.assign(new Error(message), status, {stack});
84}
85
86export class ClientUnaryCallImpl
87 extends EventEmitter
88 implements ClientUnaryCall {
89 public call?: InterceptingCallInterface;
90 constructor() {
91 super();
92 }
93
94 cancel(): void {
95 this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
96 }
97
98 getPeer(): string {
99 return this.call?.getPeer() ?? 'unknown';
100 }
101}
102
103export class ClientReadableStreamImpl<ResponseType>
104 extends Readable
105 implements ClientReadableStream<ResponseType> {
106 public call?: InterceptingCallInterface;
107 constructor(readonly deserialize: (chunk: Buffer) => ResponseType) {
108 super({ objectMode: true });
109 }
110
111 cancel(): void {
112 this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
113 }
114
115 getPeer(): string {
116 return this.call?.getPeer() ?? 'unknown';
117 }
118
119 _read(_size: number): void {
120 this.call?.startRead();
121 }
122}
123
124export class ClientWritableStreamImpl<RequestType>
125 extends Writable
126 implements ClientWritableStream<RequestType> {
127 public call?: InterceptingCallInterface;
128 constructor(readonly serialize: (value: RequestType) => Buffer) {
129 super({ objectMode: true });
130 }
131
132 cancel(): void {
133 this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
134 }
135
136 getPeer(): string {
137 return this.call?.getPeer() ?? 'unknown';
138 }
139
140 _write(chunk: RequestType, encoding: string, cb: WriteCallback) {
141 const context: MessageContext = {
142 callback: cb,
143 };
144 const flags = Number(encoding);
145 if (!Number.isNaN(flags)) {
146 context.flags = flags;
147 }
148 this.call?.sendMessageWithContext(context, chunk);
149 }
150
151 _final(cb: Function) {
152 this.call?.halfClose();
153 cb();
154 }
155}
156
157export class ClientDuplexStreamImpl<RequestType, ResponseType>
158 extends Duplex
159 implements ClientDuplexStream<RequestType, ResponseType> {
160 public call?: InterceptingCallInterface;
161 constructor(
162 readonly serialize: (value: RequestType) => Buffer,
163 readonly deserialize: (chunk: Buffer) => ResponseType
164 ) {
165 super({ objectMode: true });
166 }
167
168 cancel(): void {
169 this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
170 }
171
172 getPeer(): string {
173 return this.call?.getPeer() ?? 'unknown';
174 }
175
176 _read(_size: number): void {
177 this.call?.startRead();
178 }
179
180 _write(chunk: RequestType, encoding: string, cb: WriteCallback) {
181 const context: MessageContext = {
182 callback: cb,
183 };
184 const flags = Number(encoding);
185 if (!Number.isNaN(flags)) {
186 context.flags = flags;
187 }
188 this.call?.sendMessageWithContext(context, chunk);
189 }
190
191 _final(cb: Function) {
192 this.call?.halfClose();
193 cb();
194 }
195}