UNPKG

9.15 kBJavaScriptView Raw
1"use strict";
2// *****************************************************************************
3// Copyright (C) 2021 Red Hat, Inc. and others.
4//
5// This program and the accompanying materials are made available under the
6// terms of the Eclipse Public License v. 2.0 which is available at
7// http://www.eclipse.org/legal/epl-2.0.
8//
9// This Source Code may also be made available under the following Secondary
10// Licenses when the conditions for such availability set forth in the Eclipse
11// Public License v. 2.0 are satisfied: GNU General Public License, version 2
12// with the GNU Classpath Exception which is available at
13// https://www.gnu.org/software/classpath/license.html.
14//
15// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
16// *****************************************************************************
17/* eslint-disable @typescript-eslint/no-explicit-any */
18Object.defineProperty(exports, "__esModule", { value: true });
19exports.RpcProtocol = void 0;
20const cancellation_1 = require("../cancellation");
21const disposable_1 = require("../disposable");
22const event_1 = require("../event");
23const promise_util_1 = require("../promise-util");
24const rpc_message_encoder_1 = require("./rpc-message-encoder");
25/**
26 * Establish a RPC protocol on top of a given channel. By default the rpc protocol is bi-directional, meaning it is possible to send
27 * requests and notifications to the remote side (i.e. acts as client) as well as receiving requests and notifications from the remote side (i.e. acts as a server).
28 * Clients can get a promise for a remote request result that will be either resolved or
29 * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request
30 * Currently, there is no timeout handling for long running requests implemented.
31 * The bi-directional mode can be reconfigured using the {@link RpcProtocolOptions} to construct an RPC protocol instance that acts only as client or server instead.
32 */
33class RpcProtocol {
34 constructor(channel, requestHandler, options = {}) {
35 var _a, _b, _c;
36 this.channel = channel;
37 this.requestHandler = requestHandler;
38 this.pendingRequests = new Map();
39 this.nextMessageId = 0;
40 this.onNotificationEmitter = new event_1.Emitter();
41 this.cancellationTokenSources = new Map();
42 this.toDispose = new disposable_1.DisposableCollection();
43 this.encoder = (_a = options.encoder) !== null && _a !== void 0 ? _a : new rpc_message_encoder_1.MsgPackMessageEncoder();
44 this.decoder = (_b = options.decoder) !== null && _b !== void 0 ? _b : new rpc_message_encoder_1.MsgPackMessageDecoder();
45 this.toDispose.push(this.onNotificationEmitter);
46 channel.onClose(() => this.toDispose.dispose());
47 this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
48 this.mode = (_c = options.mode) !== null && _c !== void 0 ? _c : 'default';
49 if (this.mode !== 'clientOnly' && requestHandler === undefined) {
50 console.error('RPCProtocol was initialized without a request handler but was not set to clientOnly mode.');
51 }
52 }
53 get onNotification() {
54 return this.onNotificationEmitter.event;
55 }
56 handleMessage(message) {
57 if (this.mode !== 'clientOnly') {
58 switch (message.type) {
59 case 5 /* Cancel */: {
60 this.handleCancel(message.id);
61 return;
62 }
63 case 1 /* Request */: {
64 this.handleRequest(message.id, message.method, message.args);
65 return;
66 }
67 case 2 /* Notification */: {
68 this.handleNotify(message.id, message.method, message.args);
69 return;
70 }
71 }
72 }
73 if (this.mode !== 'serverOnly') {
74 switch (message.type) {
75 case 3 /* Reply */: {
76 this.handleReply(message.id, message.res);
77 return;
78 }
79 case 4 /* ReplyErr */: {
80 this.handleReplyErr(message.id, message.err);
81 return;
82 }
83 }
84 }
85 // If the message was not handled until here, it is incompatible with the mode.
86 console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}. ID: ${message.id}.`);
87 }
88 handleReply(id, value) {
89 const replyHandler = this.pendingRequests.get(id);
90 if (replyHandler) {
91 this.pendingRequests.delete(id);
92 replyHandler.resolve(value);
93 }
94 else {
95 throw new Error(`No reply handler for reply with id: ${id}`);
96 }
97 }
98 handleReplyErr(id, error) {
99 try {
100 const replyHandler = this.pendingRequests.get(id);
101 if (replyHandler) {
102 this.pendingRequests.delete(id);
103 replyHandler.reject(error);
104 }
105 else {
106 throw new Error(`No reply handler for error reply with id: ${id}`);
107 }
108 }
109 catch (err) {
110 throw err;
111 }
112 }
113 sendRequest(method, args) {
114 // The last element of the request args might be a cancellation token. As these tokens are not serializable we have to remove it from the
115 // args array and the `CANCELLATION_TOKEN_KEY` string instead.
116 const cancellationToken = args.length && cancellation_1.CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined;
117 const id = this.nextMessageId++;
118 const reply = new promise_util_1.Deferred();
119 if (cancellationToken) {
120 args.push(RpcProtocol.CANCELLATION_TOKEN_KEY);
121 }
122 this.pendingRequests.set(id, reply);
123 const output = this.channel.getWriteBuffer();
124 this.encoder.request(output, id, method, args);
125 output.commit();
126 if (cancellationToken === null || cancellationToken === void 0 ? void 0 : cancellationToken.isCancellationRequested) {
127 this.sendCancel(id);
128 }
129 else {
130 cancellationToken === null || cancellationToken === void 0 ? void 0 : cancellationToken.onCancellationRequested(() => this.sendCancel(id));
131 }
132 return reply.promise;
133 }
134 sendNotification(method, args) {
135 // If the notification supports a CancellationToken, it needs to be treated like a request
136 // because cancellation does not work with the simplified "fire and forget" approach of simple notifications.
137 if (args.length && cancellation_1.CancellationToken.is(args[args.length - 1])) {
138 this.sendRequest(method, args);
139 return;
140 }
141 const output = this.channel.getWriteBuffer();
142 this.encoder.notification(output, this.nextMessageId++, method, args);
143 output.commit();
144 }
145 sendCancel(requestId) {
146 const output = this.channel.getWriteBuffer();
147 this.encoder.cancel(output, requestId);
148 output.commit();
149 }
150 handleCancel(id) {
151 const cancellationTokenSource = this.cancellationTokenSources.get(id);
152 if (cancellationTokenSource) {
153 cancellationTokenSource.cancel();
154 }
155 }
156 async handleRequest(id, method, args) {
157 const output = this.channel.getWriteBuffer();
158 // Check if the last argument of the received args is the key for indicating that a cancellation token should be used
159 // If so remove the key from the args and create a new cancellation token.
160 const addToken = args.length && args[args.length - 1] === RpcProtocol.CANCELLATION_TOKEN_KEY ? args.pop() : false;
161 if (addToken) {
162 const tokenSource = new cancellation_1.CancellationTokenSource();
163 this.cancellationTokenSources.set(id, tokenSource);
164 args.push(tokenSource.token);
165 }
166 try {
167 const result = await this.requestHandler(method, args);
168 this.cancellationTokenSources.delete(id);
169 this.encoder.replyOK(output, id, result);
170 output.commit();
171 }
172 catch (err) {
173 // In case of an error the output buffer might already contains parts of an message.
174 // => Dispose the current buffer and retrieve a new, clean one for writing the response error.
175 if (disposable_1.Disposable.is(output)) {
176 output.dispose();
177 }
178 const errorOutput = this.channel.getWriteBuffer();
179 this.cancellationTokenSources.delete(id);
180 this.encoder.replyErr(errorOutput, id, err);
181 errorOutput.commit();
182 }
183 }
184 async handleNotify(id, method, args) {
185 if (this.toDispose.disposed) {
186 return;
187 }
188 this.onNotificationEmitter.fire({ method, args });
189 }
190}
191exports.RpcProtocol = RpcProtocol;
192RpcProtocol.CANCELLATION_TOKEN_KEY = 'add.cancellation.token';
193//# sourceMappingURL=rpc-protocol.js.map
\No newline at end of file