UNPKG

4.98 kBPlain TextView Raw
1/*
2 * Copyright 2022 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18import { CallCredentials } from './call-credentials';
19import { Status } from './constants';
20import { Deadline } from './deadline';
21import { Metadata } from './metadata';
22import { ServerSurfaceCall } from './server-call';
23
24export interface CallStreamOptions {
25 deadline: Deadline;
26 flags: number;
27 host: string;
28 parentCall: ServerSurfaceCall | null;
29}
30
31export type PartialCallStreamOptions = Partial<CallStreamOptions>;
32
33export interface StatusObject {
34 code: Status;
35 details: string;
36 metadata: Metadata;
37}
38
39export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
40 metadata: Metadata | null;
41};
42
43export const enum WriteFlags {
44 BufferHint = 1,
45 NoCompress = 2,
46 WriteThrough = 4,
47}
48
49export interface WriteObject {
50 message: Buffer;
51 flags?: number;
52}
53
54export interface MetadataListener {
55 (metadata: Metadata, next: (metadata: Metadata) => void): void;
56}
57
58export interface MessageListener {
59 // eslint-disable-next-line @typescript-eslint/no-explicit-any
60 (message: any, next: (message: any) => void): void;
61}
62
63export interface StatusListener {
64 (status: StatusObject, next: (status: StatusObject) => void): void;
65}
66
67export interface FullListener {
68 onReceiveMetadata: MetadataListener;
69 onReceiveMessage: MessageListener;
70 onReceiveStatus: StatusListener;
71}
72
73export type Listener = Partial<FullListener>;
74
75/**
76 * An object with methods for handling the responses to a call.
77 */
78export interface InterceptingListener {
79 onReceiveMetadata(metadata: Metadata): void;
80 // eslint-disable-next-line @typescript-eslint/no-explicit-any
81 onReceiveMessage(message: any): void;
82 onReceiveStatus(status: StatusObject): void;
83}
84
85export function isInterceptingListener(
86 listener: Listener | InterceptingListener
87): listener is InterceptingListener {
88 return (
89 listener.onReceiveMetadata !== undefined &&
90 listener.onReceiveMetadata.length === 1
91 );
92}
93
94export class InterceptingListenerImpl implements InterceptingListener {
95 private processingMetadata = false;
96 private hasPendingMessage = false;
97 private pendingMessage: any;
98 private processingMessage = false;
99 private pendingStatus: StatusObject | null = null;
100 constructor(
101 private listener: FullListener,
102 private nextListener: InterceptingListener
103 ) {}
104
105 private processPendingMessage() {
106 if (this.hasPendingMessage) {
107 this.nextListener.onReceiveMessage(this.pendingMessage);
108 this.pendingMessage = null;
109 this.hasPendingMessage = false;
110 }
111 }
112
113 private processPendingStatus() {
114 if (this.pendingStatus) {
115 this.nextListener.onReceiveStatus(this.pendingStatus);
116 }
117 }
118
119 onReceiveMetadata(metadata: Metadata): void {
120 this.processingMetadata = true;
121 this.listener.onReceiveMetadata(metadata, metadata => {
122 this.processingMetadata = false;
123 this.nextListener.onReceiveMetadata(metadata);
124 this.processPendingMessage();
125 this.processPendingStatus();
126 });
127 }
128 // eslint-disable-next-line @typescript-eslint/no-explicit-any
129 onReceiveMessage(message: any): void {
130 /* If this listener processes messages asynchronously, the last message may
131 * be reordered with respect to the status */
132 this.processingMessage = true;
133 this.listener.onReceiveMessage(message, msg => {
134 this.processingMessage = false;
135 if (this.processingMetadata) {
136 this.pendingMessage = msg;
137 this.hasPendingMessage = true;
138 } else {
139 this.nextListener.onReceiveMessage(msg);
140 this.processPendingStatus();
141 }
142 });
143 }
144 onReceiveStatus(status: StatusObject): void {
145 this.listener.onReceiveStatus(status, processedStatus => {
146 if (this.processingMetadata || this.processingMessage) {
147 this.pendingStatus = processedStatus;
148 } else {
149 this.nextListener.onReceiveStatus(processedStatus);
150 }
151 });
152 }
153}
154
155export interface WriteCallback {
156 (error?: Error | null): void;
157}
158
159export interface MessageContext {
160 callback?: WriteCallback;
161 flags?: number;
162}
163
164export interface Call {
165 cancelWithStatus(status: Status, details: string): void;
166 getPeer(): string;
167 start(metadata: Metadata, listener: InterceptingListener): void;
168 sendMessageWithContext(context: MessageContext, message: Buffer): void;
169 startRead(): void;
170 halfClose(): void;
171 getCallNumber(): number;
172 setCredentials(credentials: CallCredentials): void;
173}