UNPKG

9.38 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const net = require("net");
4const path = require("path");
5const fs = require("fs-extra");
6const structured_clone_1 = require("@hyurl/structured-clone");
7const advanced_collections_1 = require("advanced-collections");
8const check_iterable_1 = require("check-iterable");
9const isSocketResetError = require("is-socket-reset-error");
10const channel_1 = require("./channel");
11const util_1 = require("../util");
12const values = require("lodash/values");
13const isOwnKey_1 = require("@hyurl/utils/isOwnKey");
14const authorized = Symbol("authorized");
15class RpcServer extends channel_1.RpcChannel {
16 constructor(options, host) {
17 super(options, host);
18 this.server = null;
19 this.registry = util_1.dict();
20 this.clients = new advanced_collections_1.BiMap();
21 this.suspendedTasks = new Map();
22 this.proxyRoot = null;
23 this.enableLifeCycle = false;
24 this.id = this.id || this.dsn;
25 }
26 async open(enableLifeCycle = true) {
27 if (enableLifeCycle) {
28 this.enableLifeCycle = true;
29 for (let mod of values(this.registry)) {
30 await util_1.tryLifeCycleFunction(mod, "init", this.errorHandler);
31 }
32 }
33 if (this.path) {
34 await fs.ensureDir(path.dirname(this.path));
35 if (await fs.pathExists(this.path)) {
36 await fs.unlink(this.path);
37 }
38 }
39 return new Promise((resolve, reject) => {
40 let server = this.server = net.createServer();
41 let listener = () => {
42 server.on("error", (err) => {
43 if (this.errorHandler) {
44 this.errorHandler.call(this, err);
45 }
46 else {
47 console.error(err);
48 process.exit(1);
49 }
50 });
51 resolve(this);
52 };
53 server.once("error", reject)
54 .on("connection", this.handleConnection.bind(this));
55 if (this.path) {
56 server.listen(util_1.absPath(this.path, true), listener);
57 }
58 else if (this.host) {
59 server.listen(this.port, this.host, listener);
60 }
61 else {
62 server.listen(this.port, listener);
63 }
64 });
65 }
66 async close() {
67 await new Promise(resolve => {
68 if (this.server) {
69 let timer = setTimeout(() => {
70 for (let [, socket] of this.clients) {
71 socket.destroy();
72 }
73 }, 1000);
74 this.server.unref();
75 this.server.close(() => {
76 clearTimeout(timer);
77 resolve();
78 });
79 }
80 else {
81 resolve();
82 }
83 });
84 if (this.enableLifeCycle) {
85 await Promise.all(values(this.registry).map(mod => {
86 return util_1.tryLifeCycleFunction(mod, "destroy").catch(err => {
87 this.errorHandler && this.errorHandler(err);
88 });
89 }));
90 }
91 if (this.proxyRoot) {
92 this.proxyRoot["server"] = null;
93 this.proxyRoot = null;
94 }
95 return this;
96 }
97 register(mod) {
98 this.registry[mod.name] = mod;
99 return this;
100 }
101 publish(topic, data, clients) {
102 let sent = false;
103 let socket;
104 let targets = clients || this.clients.keys();
105 for (let id of targets) {
106 if (socket = this.clients.get(id)) {
107 this.dispatch(socket, channel_1.RpcEvents.BROADCAST, topic, data);
108 sent = true;
109 }
110 }
111 return sent;
112 }
113 getClients() {
114 let clients = [];
115 for (let [id] of this.clients) {
116 clients.push(id);
117 }
118 return clients;
119 }
120 dispatch(socket, event, ...data) {
121 if (!socket.destroyed && socket.writable) {
122 if (event === channel_1.RpcEvents.THROW) {
123 data = structured_clone_1.compose(data);
124 }
125 socket.write([event, ...data]);
126 }
127 }
128 handleConnection(socket) {
129 let addr = `${socket.remoteAddress || ""}:${socket.remotePort || ""}`;
130 let destroyWithHandshakeError = () => {
131 socket.destroy(new Error(`Handshake required (client: ${addr})`));
132 };
133 let autoDestroy = setTimeout(destroyWithHandshakeError, 5000);
134 this.bsp.wrap(socket).on("error", err => {
135 if (!isSocketResetError(err) && this.errorHandler) {
136 this.errorHandler(err);
137 }
138 }).on("close", () => {
139 let tasks = this.suspendedTasks.get(socket);
140 if (tasks) {
141 this.suspendedTasks.delete(socket);
142 this.clients.deleteValue(socket);
143 for (let task of tasks.values()) {
144 task.return();
145 }
146 }
147 }).on("data", async (msg) => {
148 if (this.secret && !socket[authorized]) {
149 if (this.secret === msg) {
150 socket[authorized] = true;
151 return;
152 }
153 else {
154 return socket.destroy(new Error(`Connection unauthorized (client: ${addr})`));
155 }
156 }
157 if (this.codec === "BSON" && typeof msg === "object") {
158 msg = Array.from(Object.assign(msg, {
159 length: Object.keys(msg).length
160 }));
161 }
162 if (!Array.isArray(msg))
163 return;
164 let [event, taskId, modName, method, ...args] = msg;
165 if (!this.suspendedTasks.has(socket) &&
166 event !== channel_1.RpcEvents.HANDSHAKE) {
167 return destroyWithHandshakeError();
168 }
169 switch (event) {
170 case channel_1.RpcEvents.HANDSHAKE: {
171 let clientId = String(taskId);
172 clearTimeout(autoDestroy);
173 this.clients.set(clientId, socket);
174 this.suspendedTasks.set(socket, new Map());
175 this.dispatch(socket, channel_1.RpcEvents.CONNECT, clientId, this.id);
176 break;
177 }
178 case channel_1.RpcEvents.PING: {
179 this.dispatch(socket, channel_1.RpcEvents.PONG);
180 break;
181 }
182 case channel_1.RpcEvents.INVOKE: {
183 let data;
184 let tasks = this.suspendedTasks.get(socket);
185 try {
186 let ins = this.registry[modName]();
187 if (isOwnKey_1.default(ins, util_1.readyState) && ins[util_1.readyState] !== 2) {
188 util_1.throwUnavailableError(modName);
189 }
190 let task = ins[method].apply(ins, args);
191 if (task && check_iterable_1.isIteratorLike(task)) {
192 tasks.set(taskId, task);
193 event = channel_1.RpcEvents.INVOKE;
194 }
195 else {
196 data = await task;
197 event = channel_1.RpcEvents.RETURN;
198 }
199 }
200 catch (err) {
201 event = channel_1.RpcEvents.THROW;
202 data = err;
203 }
204 this.dispatch(socket, event, taskId, data);
205 break;
206 }
207 case channel_1.RpcEvents.YIELD:
208 case channel_1.RpcEvents.RETURN:
209 case channel_1.RpcEvents.THROW: {
210 let data, input;
211 let tasks = this.suspendedTasks.get(socket);
212 let task = tasks.get(taskId);
213 try {
214 if (!task) {
215 let callee = `${modName}(<route>).${method}()`;
216 throw new ReferenceError(`${callee} failed (${taskId})`);
217 }
218 else {
219 input = args[0];
220 }
221 if (event === channel_1.RpcEvents.YIELD) {
222 data = await task.next(input);
223 }
224 else if (event === channel_1.RpcEvents.RETURN) {
225 data = await task.return(input);
226 }
227 else {
228 await task.throw(input);
229 }
230 data.done && tasks.delete(taskId);
231 }
232 catch (err) {
233 event = channel_1.RpcEvents.THROW;
234 data = err;
235 task && tasks.delete(taskId);
236 }
237 this.dispatch(socket, event, taskId, data);
238 break;
239 }
240 }
241 });
242 }
243}
244exports.RpcServer = RpcServer;
245//# sourceMappingURL=server.js.map
\No newline at end of file