UNPKG

5.45 kBPlain TextView Raw
1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { EventEmitter } from 'events';
19import { Duplex, Readable, Writable } from 'stream';
20
21import { StatusObject, MessageContext } from './call-interface';
22import { Status } from './constants';
23import { EmitterAugmentation1 } from './events';
24import { Metadata } from './metadata';
25import { ObjectReadable, ObjectWritable, WriteCallback } from './object-stream';
26import { InterceptingCallInterface } from './client-interceptors';
27
28/**
29 * A type extending the built-in Error object with additional fields.
30 */
31export type ServiceError = StatusObject & Error;
32
33/**
34 * A base type for all user-facing values returned by client-side method calls.
35 */
36export type SurfaceCall = {
37 call?: InterceptingCallInterface;
38 cancel(): void;
39 getPeer(): string;
40} & EmitterAugmentation1<'metadata', Metadata> &
41 EmitterAugmentation1<'status', StatusObject> &
42 EventEmitter;
43
44/**
45 * A type representing the return value of a unary method call.
46 */
47export type ClientUnaryCall = SurfaceCall;
48
49/**
50 * A type representing the return value of a server stream method call.
51 */
52export type ClientReadableStream<ResponseType> = {
53 deserialize: (chunk: Buffer) => ResponseType;
54} & SurfaceCall &
55 ObjectReadable<ResponseType>;
56
57/**
58 * A type representing the return value of a client stream method call.
59 */
60export type ClientWritableStream<RequestType> = {
61 serialize: (value: RequestType) => Buffer;
62} & SurfaceCall &
63 ObjectWritable<RequestType>;
64
65/**
66 * A type representing the return value of a bidirectional stream method call.
67 */
68export type ClientDuplexStream<RequestType, ResponseType> =
69 ClientWritableStream<RequestType> & ClientReadableStream<ResponseType>;
70
71/**
72 * Construct a ServiceError from a StatusObject. This function exists primarily
73 * as an attempt to make the error stack trace clearly communicate that the
74 * error is not necessarily a problem in gRPC itself.
75 * @param status
76 */
77export function callErrorFromStatus(
78 status: StatusObject,
79 callerStack: string
80): ServiceError {
81 const message = `${status.code} ${Status[status.code]}: ${status.details}`;
82 const error = new Error(message);
83 const stack = `${error.stack}\nfor call at\n${callerStack}`;
84 return Object.assign(new Error(message), status, { stack });
85}
86
87export class ClientUnaryCallImpl
88 extends EventEmitter
89 implements ClientUnaryCall
90{
91 public call?: InterceptingCallInterface;
92 constructor() {
93 super();
94 }
95
96 cancel(): void {
97 this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
98 }
99
100 getPeer(): string {
101 return this.call?.getPeer() ?? 'unknown';
102 }
103}
104
105export class ClientReadableStreamImpl<ResponseType>
106 extends Readable
107 implements ClientReadableStream<ResponseType>
108{
109 public call?: InterceptingCallInterface;
110 constructor(readonly deserialize: (chunk: Buffer) => ResponseType) {
111 super({ objectMode: true });
112 }
113
114 cancel(): void {
115 this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
116 }
117
118 getPeer(): string {
119 return this.call?.getPeer() ?? 'unknown';
120 }
121
122 _read(_size: number): void {
123 this.call?.startRead();
124 }
125}
126
127export class ClientWritableStreamImpl<RequestType>
128 extends Writable
129 implements ClientWritableStream<RequestType>
130{
131 public call?: InterceptingCallInterface;
132 constructor(readonly serialize: (value: RequestType) => Buffer) {
133 super({ objectMode: true });
134 }
135
136 cancel(): void {
137 this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
138 }
139
140 getPeer(): string {
141 return this.call?.getPeer() ?? 'unknown';
142 }
143
144 _write(chunk: RequestType, encoding: string, cb: WriteCallback) {
145 const context: MessageContext = {
146 callback: cb,
147 };
148 const flags = Number(encoding);
149 if (!Number.isNaN(flags)) {
150 context.flags = flags;
151 }
152 this.call?.sendMessageWithContext(context, chunk);
153 }
154
155 _final(cb: Function) {
156 this.call?.halfClose();
157 cb();
158 }
159}
160
161export class ClientDuplexStreamImpl<RequestType, ResponseType>
162 extends Duplex
163 implements ClientDuplexStream<RequestType, ResponseType>
164{
165 public call?: InterceptingCallInterface;
166 constructor(
167 readonly serialize: (value: RequestType) => Buffer,
168 readonly deserialize: (chunk: Buffer) => ResponseType
169 ) {
170 super({ objectMode: true });
171 }
172
173 cancel(): void {
174 this.call?.cancelWithStatus(Status.CANCELLED, 'Cancelled on client');
175 }
176
177 getPeer(): string {
178 return this.call?.getPeer() ?? 'unknown';
179 }
180
181 _read(_size: number): void {
182 this.call?.startRead();
183 }
184
185 _write(chunk: RequestType, encoding: string, cb: WriteCallback) {
186 const context: MessageContext = {
187 callback: cb,
188 };
189 const flags = Number(encoding);
190 if (!Number.isNaN(flags)) {
191 context.flags = flags;
192 }
193 this.call?.sendMessageWithContext(context, chunk);
194 }
195
196 _final(cb: Function) {
197 this.call?.halfClose();
198 cb();
199 }
200}