1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | const net = require("net");
|
4 | const path = require("path");
|
5 | const fs = require("fs-extra");
|
6 | const structured_clone_1 = require("@hyurl/structured-clone");
|
7 | const advanced_collections_1 = require("advanced-collections");
|
8 | const check_iterable_1 = require("check-iterable");
|
9 | const isSocketResetError = require("is-socket-reset-error");
|
10 | const channel_1 = require("./channel");
|
11 | const util_1 = require("../util");
|
12 | const values = require("lodash/values");
|
13 | const isOwnKey_1 = require("@hyurl/utils/isOwnKey");
|
14 | const authorized = Symbol("authorized");
|
15 | class RpcServer extends channel_1.RpcChannel {
|
16 | constructor(options, host) {
|
17 | super(options, host);
|
18 | this.server = null;
|
19 | this.registry = util_1.dict();
|
20 | this.clients = new advanced_collections_1.BiMap();
|
21 | this.suspendedTasks = new Map();
|
22 | this.proxyRoot = null;
|
23 | this.enableLifeCycle = false;
|
24 | this.id = this.id || this.dsn;
|
25 | }
|
26 | async open(enableLifeCycle = true) {
|
27 | if (enableLifeCycle) {
|
28 | this.enableLifeCycle = true;
|
29 | for (let mod of values(this.registry)) {
|
30 | await util_1.tryLifeCycleFunction(mod, "init", this.errorHandler);
|
31 | }
|
32 | }
|
33 | if (this.path) {
|
34 | await fs.ensureDir(path.dirname(this.path));
|
35 | if (await fs.pathExists(this.path)) {
|
36 | await fs.unlink(this.path);
|
37 | }
|
38 | }
|
39 | return new Promise((resolve, reject) => {
|
40 | let server = this.server = net.createServer();
|
41 | let listener = () => {
|
42 | server.on("error", (err) => {
|
43 | if (this.errorHandler) {
|
44 | this.errorHandler.call(this, err);
|
45 | }
|
46 | else {
|
47 | console.error(err);
|
48 | process.exit(1);
|
49 | }
|
50 | });
|
51 | resolve(this);
|
52 | };
|
53 | server.once("error", reject)
|
54 | .on("connection", this.handleConnection.bind(this));
|
55 | if (this.path) {
|
56 | server.listen(util_1.absPath(this.path, true), listener);
|
57 | }
|
58 | else if (this.host) {
|
59 | server.listen(this.port, this.host, listener);
|
60 | }
|
61 | else {
|
62 | server.listen(this.port, listener);
|
63 | }
|
64 | });
|
65 | }
|
66 | async close() {
|
67 | await new Promise(resolve => {
|
68 | if (this.server) {
|
69 | let timer = setTimeout(() => {
|
70 | for (let [, socket] of this.clients) {
|
71 | socket.destroy();
|
72 | }
|
73 | }, 1000);
|
74 | this.server.unref();
|
75 | this.server.close(() => {
|
76 | clearTimeout(timer);
|
77 | resolve();
|
78 | });
|
79 | }
|
80 | else {
|
81 | resolve();
|
82 | }
|
83 | });
|
84 | if (this.enableLifeCycle) {
|
85 | await Promise.all(values(this.registry).map(mod => {
|
86 | return util_1.tryLifeCycleFunction(mod, "destroy").catch(err => {
|
87 | this.errorHandler && this.errorHandler(err);
|
88 | });
|
89 | }));
|
90 | }
|
91 | if (this.proxyRoot) {
|
92 | this.proxyRoot["server"] = null;
|
93 | this.proxyRoot = null;
|
94 | }
|
95 | return this;
|
96 | }
|
97 | register(mod) {
|
98 | this.registry[mod.name] = mod;
|
99 | return this;
|
100 | }
|
101 | publish(topic, data, clients) {
|
102 | let sent = false;
|
103 | let socket;
|
104 | let targets = clients || this.clients.keys();
|
105 | for (let id of targets) {
|
106 | if (socket = this.clients.get(id)) {
|
107 | this.dispatch(socket, channel_1.RpcEvents.BROADCAST, topic, data);
|
108 | sent = true;
|
109 | }
|
110 | }
|
111 | return sent;
|
112 | }
|
113 | getClients() {
|
114 | let clients = [];
|
115 | for (let [id] of this.clients) {
|
116 | clients.push(id);
|
117 | }
|
118 | return clients;
|
119 | }
|
120 | dispatch(socket, event, ...data) {
|
121 | if (!socket.destroyed && socket.writable) {
|
122 | if (event === channel_1.RpcEvents.THROW) {
|
123 | data = structured_clone_1.compose(data);
|
124 | }
|
125 | socket.write([event, ...data]);
|
126 | }
|
127 | }
|
128 | handleConnection(socket) {
|
129 | let addr = `${socket.remoteAddress || ""}:${socket.remotePort || ""}`;
|
130 | let destroyWithHandshakeError = () => {
|
131 | socket.destroy(new Error(`Handshake required (client: ${addr})`));
|
132 | };
|
133 | let autoDestroy = setTimeout(destroyWithHandshakeError, 5000);
|
134 | this.bsp.wrap(socket).on("error", err => {
|
135 | if (!isSocketResetError(err) && this.errorHandler) {
|
136 | this.errorHandler(err);
|
137 | }
|
138 | }).on("close", () => {
|
139 | let tasks = this.suspendedTasks.get(socket);
|
140 | if (tasks) {
|
141 | this.suspendedTasks.delete(socket);
|
142 | this.clients.deleteValue(socket);
|
143 | for (let task of tasks.values()) {
|
144 | task.return();
|
145 | }
|
146 | }
|
147 | }).on("data", async (msg) => {
|
148 | if (this.secret && !socket[authorized]) {
|
149 | if (this.secret === msg) {
|
150 | socket[authorized] = true;
|
151 | return;
|
152 | }
|
153 | else {
|
154 | return socket.destroy(new Error(`Connection unauthorized (client: ${addr})`));
|
155 | }
|
156 | }
|
157 | if (this.codec === "BSON" && typeof msg === "object") {
|
158 | msg = Array.from(Object.assign(msg, {
|
159 | length: Object.keys(msg).length
|
160 | }));
|
161 | }
|
162 | if (!Array.isArray(msg))
|
163 | return;
|
164 | let [event, taskId, modName, method, ...args] = msg;
|
165 | if (!this.suspendedTasks.has(socket) &&
|
166 | event !== channel_1.RpcEvents.HANDSHAKE) {
|
167 | return destroyWithHandshakeError();
|
168 | }
|
169 | switch (event) {
|
170 | case channel_1.RpcEvents.HANDSHAKE: {
|
171 | let clientId = String(taskId);
|
172 | clearTimeout(autoDestroy);
|
173 | this.clients.set(clientId, socket);
|
174 | this.suspendedTasks.set(socket, new Map());
|
175 | this.dispatch(socket, channel_1.RpcEvents.CONNECT, clientId, this.id);
|
176 | break;
|
177 | }
|
178 | case channel_1.RpcEvents.PING: {
|
179 | this.dispatch(socket, channel_1.RpcEvents.PONG);
|
180 | break;
|
181 | }
|
182 | case channel_1.RpcEvents.INVOKE: {
|
183 | let data;
|
184 | let tasks = this.suspendedTasks.get(socket);
|
185 | try {
|
186 | let ins = this.registry[modName]();
|
187 | if (isOwnKey_1.default(ins, util_1.readyState) && ins[util_1.readyState] !== 2) {
|
188 | util_1.throwUnavailableError(modName);
|
189 | }
|
190 | let task = ins[method].apply(ins, args);
|
191 | if (task && check_iterable_1.isIteratorLike(task)) {
|
192 | tasks.set(taskId, task);
|
193 | event = channel_1.RpcEvents.INVOKE;
|
194 | }
|
195 | else {
|
196 | data = await task;
|
197 | event = channel_1.RpcEvents.RETURN;
|
198 | }
|
199 | }
|
200 | catch (err) {
|
201 | event = channel_1.RpcEvents.THROW;
|
202 | data = err;
|
203 | }
|
204 | this.dispatch(socket, event, taskId, data);
|
205 | break;
|
206 | }
|
207 | case channel_1.RpcEvents.YIELD:
|
208 | case channel_1.RpcEvents.RETURN:
|
209 | case channel_1.RpcEvents.THROW: {
|
210 | let data, input;
|
211 | let tasks = this.suspendedTasks.get(socket);
|
212 | let task = tasks.get(taskId);
|
213 | try {
|
214 | if (!task) {
|
215 | let callee = `${modName}(<route>).${method}()`;
|
216 | throw new ReferenceError(`${callee} failed (${taskId})`);
|
217 | }
|
218 | else {
|
219 | input = args[0];
|
220 | }
|
221 | if (event === channel_1.RpcEvents.YIELD) {
|
222 | data = await task.next(input);
|
223 | }
|
224 | else if (event === channel_1.RpcEvents.RETURN) {
|
225 | data = await task.return(input);
|
226 | }
|
227 | else {
|
228 | await task.throw(input);
|
229 | }
|
230 | data.done && tasks.delete(taskId);
|
231 | }
|
232 | catch (err) {
|
233 | event = channel_1.RpcEvents.THROW;
|
234 | data = err;
|
235 | task && tasks.delete(taskId);
|
236 | }
|
237 | this.dispatch(socket, event, taskId, data);
|
238 | break;
|
239 | }
|
240 | }
|
241 | });
|
242 | }
|
243 | }
|
244 | exports.RpcServer = RpcServer;
|
245 |
|
\ | No newline at end of file |