UNPKG

15.8 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const net = require("net");
4const sequid_1 = require("sequid");
5const backoff_1 = require("backoff");
6const structured_clone_1 = require("@hyurl/structured-clone");
7const isSocketResetError = require("is-socket-reset-error");
8const thenable_generator_1 = require("thenable-generator");
9const channel_1 = require("./channel");
10const util_1 = require("../util");
11const last = require("lodash/last");
12const isOwnKey_1 = require("@hyurl/utils/isOwnKey");
13class RpcClient extends channel_1.RpcChannel {
14 constructor(options, host) {
15 super(options, host);
16 this.state = "initiated";
17 this.socket = null;
18 this.registry = util_1.dict();
19 this.taskId = sequid_1.default(0, true);
20 this.tasks = new Map();
21 this.topics = new Map();
22 this.finishConnect = null;
23 this.rejectConnect = null;
24 this.lastActiveTime = Date.now();
25 this.selfDestruction = null;
26 this.pingTimer = null;
27 this.reconnect = null;
28 this.id = this.id || Math.random().toString(16).slice(2);
29 this.timeout = this.timeout || 5000;
30 this.pingInterval = this.pingInterval || 5000;
31 this.serverId = this.serverId || this.dsn;
32 }
33 get connecting() {
34 return this.state === "connecting";
35 }
36 get connected() {
37 return this.state === "connected";
38 }
39 get closed() {
40 return this.state === "closed";
41 }
42 ;
43 open() {
44 return new Promise((resolve, reject) => {
45 let { serverId } = this;
46 if (this.socket && this.socket.connecting) {
47 throw new Error(`Channel to ${serverId} is already open`);
48 }
49 else if (this.closed) {
50 throw new Error(`Cannot reconnect to ${serverId} after closing the channel`);
51 }
52 this.state === "connecting";
53 this.finishConnect = () => {
54 this.state = "connected";
55 this.resume();
56 if (!this.pingTimer && !this.reconnect) {
57 this.setPingAndReconnectTimer(serverId);
58 }
59 resolve(this);
60 };
61 this.rejectConnect = () => {
62 reject(new Error(`Unable to connect ${serverId}`));
63 };
64 let timer = setTimeout(this.rejectConnect, this.timeout);
65 let connectListener = () => {
66 clearTimeout(timer);
67 this.socket.removeListener("error", errorListener);
68 this.prepareChannel();
69 if (this.secret) {
70 this.socket.write(this.secret, () => {
71 this.send(channel_1.RpcEvents.HANDSHAKE, this.id);
72 });
73 }
74 else {
75 this.send(channel_1.RpcEvents.HANDSHAKE, this.id);
76 }
77 };
78 let errorListener = (err) => {
79 clearTimeout(timer);
80 this.socket.removeListener("connect", connectListener);
81 reject(err);
82 };
83 if (this.path) {
84 this.socket = net.createConnection(this.path, connectListener);
85 }
86 else {
87 this.socket = net.createConnection(this.port, this.host, connectListener);
88 }
89 this.socket = this.bsp.wrap(this.socket);
90 this.socket.once("error", errorListener);
91 });
92 }
93 close() {
94 return new Promise(resolve => {
95 clearInterval(this.pingTimer);
96 clearTimeout(this.selfDestruction);
97 this.state = "closed";
98 this.reconnect.reset();
99 this.pause();
100 if (this.socket) {
101 this.socket.unref();
102 this.socket.end();
103 resolve(this);
104 }
105 else {
106 resolve(this);
107 }
108 });
109 }
110 register(mod) {
111 if (!this.registry[mod.name]) {
112 this.registry[mod.name] = mod;
113 let singletons = mod["remoteSingletons"];
114 singletons[this.serverId] = util_1.createRemoteInstance(mod, (prop) => this.createFunction(mod, prop));
115 singletons[this.serverId][util_1.readyState] = this.connected ? 2 : 0;
116 }
117 return this;
118 }
119 setPingAndReconnectTimer(serverId) {
120 this.pingTimer = setInterval(() => {
121 let duration = Date.now() - this.lastActiveTime;
122 if (duration >= this.pingInterval) {
123 this.selfDestruction = setTimeout(this.socket.destroy.bind(this.socket), this.timeout);
124 this.send(channel_1.RpcEvents.PING, this.id);
125 }
126 }, 5000);
127 this.reconnect = backoff_1.exponential({
128 maxDelay: 5000
129 }).on("ready", async (num) => {
130 try {
131 await this.open();
132 }
133 catch (e) { }
134 if (this.connected) {
135 this.reconnect.reset();
136 this.resume();
137 }
138 else if (num === 365) {
139 await this.close();
140 console.error(`Connection to ${serverId} lost permanently`);
141 }
142 else {
143 this.reconnect.backoff();
144 }
145 });
146 }
147 flushReadyState(state) {
148 for (let name in this.registry) {
149 let mod = this.registry[name];
150 let singletons = mod["remoteSingletons"];
151 singletons[this.serverId][util_1.readyState] = state;
152 }
153 }
154 pause() {
155 this.flushReadyState(0);
156 }
157 resume() {
158 this.flushReadyState(2);
159 }
160 subscribe(topic, handle) {
161 let handlers = this.topics.get(topic);
162 handlers || this.topics.set(topic, handlers = new Set());
163 handlers.add(handle);
164 return this;
165 }
166 unsubscribe(topic, handle) {
167 if (!handle) {
168 return this.topics.delete(topic);
169 }
170 else {
171 let handlers = this.topics.get(topic);
172 if (handlers) {
173 return handlers.delete(handle);
174 }
175 else {
176 return false;
177 }
178 }
179 }
180 send(...data) {
181 if (this.socket && !this.socket.destroyed && this.socket.writable) {
182 if (last(data) === undefined) {
183 data.pop();
184 }
185 this.socket.write(data);
186 }
187 }
188 createFunction(mod, method) {
189 let self = this;
190 return function (...args) {
191 let root = mod[util_1.proxyRoot];
192 if (root && root["server"] && root["server"].id === self.serverId) {
193 let ins = mod.instance();
194 if (isOwnKey_1.default(ins, util_1.readyState) && ins[util_1.readyState] !== 2 &&
195 !mod.fallbackToLocal()) {
196 util_1.throwUnavailableError(mod.name);
197 }
198 else {
199 return new thenable_generator_1.ThenableAsyncGenerator(ins[method](...args));
200 }
201 }
202 if (!self.connected) {
203 if (mod.fallbackToLocal()) {
204 return new thenable_generator_1.ThenableAsyncGenerator(mod.instance()[method](...args));
205 }
206 else {
207 util_1.throwUnavailableError(mod.name);
208 }
209 }
210 return new thenable_generator_1.ThenableAsyncGenerator(new ThenableIteratorProxy(self, mod.name, method, ...args));
211 };
212 }
213 prepareChannel() {
214 this.socket.on("error", err => {
215 if (!isSocketResetError(err)) {
216 if (this.errorHandler) {
217 this.errorHandler(err);
218 }
219 else {
220 console.error(err);
221 }
222 }
223 }).on("close", () => {
224 if (this.connecting) {
225 this.rejectConnect && this.rejectConnect();
226 }
227 else if (!this.closed) {
228 this.state = "connecting";
229 this.pause();
230 this.reconnect && this.reconnect.backoff();
231 }
232 }).on("data", async (msg) => {
233 this.lastActiveTime = Date.now();
234 if (this.selfDestruction) {
235 clearTimeout(this.selfDestruction);
236 this.selfDestruction = null;
237 }
238 if (this.codec === "BSON") {
239 msg = Array.from(Object.assign(msg, {
240 length: Object.keys(msg).length
241 }));
242 }
243 let [event, taskId, data] = msg;
244 let task;
245 switch (event) {
246 case channel_1.RpcEvents.CONNECT: {
247 if (data !== this.serverId) {
248 for (let name in this.registry) {
249 let mod = this.registry[name];
250 let singletons = mod["remoteSingletons"];
251 if (singletons[this.serverId]) {
252 singletons[data] = singletons[this.serverId];
253 delete singletons[this.serverId];
254 }
255 }
256 }
257 this.serverId = data;
258 this.finishConnect();
259 break;
260 }
261 case channel_1.RpcEvents.BROADCAST: {
262 let handlers = this.topics.get(taskId);
263 if (handlers) {
264 handlers.forEach(async (handle) => {
265 try {
266 await handle(data);
267 }
268 catch (err) {
269 this.errorHandler && this.errorHandler(err);
270 }
271 });
272 }
273 break;
274 }
275 case channel_1.RpcEvents.INVOKE:
276 case channel_1.RpcEvents.YIELD:
277 case channel_1.RpcEvents.RETURN: {
278 if (task = this.tasks.get(taskId)) {
279 task.resolve(data);
280 }
281 break;
282 }
283 case channel_1.RpcEvents.THROW: {
284 if (task = this.tasks.get(taskId)) {
285 (this.codec !== "CLONE") && (data = structured_clone_1.decompose(data));
286 task.reject(data);
287 }
288 break;
289 }
290 }
291 });
292 return this;
293 }
294}
295exports.RpcClient = RpcClient;
296class ThenableIteratorProxy {
297 constructor(client, modName, method, ...args) {
298 this.client = client;
299 this.modName = modName;
300 this.method = method;
301 this.taskId = this.client["taskId"].next().value;
302 this.queue = [];
303 this.status = "uninitiated";
304 this.args = args;
305 this.result = this.invokeTask(channel_1.RpcEvents.INVOKE, ...this.args);
306 }
307 next(value) {
308 return this.invokeTask(channel_1.RpcEvents.YIELD, value);
309 }
310 return(value) {
311 return this.invokeTask(channel_1.RpcEvents.RETURN, value);
312 }
313 throw(err) {
314 return this.invokeTask(channel_1.RpcEvents.THROW, err);
315 }
316 then(resolver, rejecter) {
317 return Promise.resolve(this.result).then((res) => {
318 this.status = "closed";
319 this.result = res;
320 this.client["tasks"].delete(this.taskId);
321 return res;
322 }).then(resolver, rejecter);
323 }
324 close() {
325 this.status = "closed";
326 for (let task of this.queue) {
327 switch (task.event) {
328 case channel_1.RpcEvents.INVOKE:
329 task.resolve(void 0);
330 break;
331 case channel_1.RpcEvents.YIELD:
332 task.resolve({ value: void 0, done: true });
333 break;
334 case channel_1.RpcEvents.RETURN:
335 task.resolve({ value: task.data, done: true });
336 break;
337 case channel_1.RpcEvents.THROW:
338 task.reject(task.data);
339 break;
340 }
341 }
342 this.queue = [];
343 }
344 createTimeout() {
345 return setTimeout(() => {
346 if (this.queue.length > 0) {
347 let task = this.queue.shift();
348 let callee = `${this.modName}(<route>).${this.method}()`;
349 let duration = util_1.humanizeDuration(this.client.timeout);
350 task.reject(new Error(`${callee} timeout after ${duration}`));
351 }
352 this.close();
353 }, this.client.timeout);
354 }
355 prepareTask(event, data) {
356 let task = this.client["tasks"].get(this.taskId);
357 if (!task) {
358 this.client["tasks"].set(this.taskId, task = {
359 resolve: (data) => {
360 if (this.status === "suspended") {
361 if (this.queue.length > 0) {
362 this.queue.shift().resolve(data);
363 }
364 }
365 },
366 reject: (err) => {
367 if (this.status === "suspended") {
368 if (this.queue.length > 0) {
369 this.queue.shift().reject(err);
370 }
371 this.close();
372 }
373 }
374 });
375 }
376 return new Promise((resolve, reject) => {
377 let timer = this.createTimeout();
378 this.queue.push({
379 event,
380 data,
381 resolve: (data) => {
382 clearTimeout(timer);
383 resolve(data);
384 },
385 reject: (err) => {
386 clearTimeout(timer);
387 reject(err);
388 }
389 });
390 });
391 }
392 async invokeTask(event, ...args) {
393 if (this.status === "closed") {
394 switch (event) {
395 case channel_1.RpcEvents.INVOKE:
396 return Promise.resolve(this.result);
397 case channel_1.RpcEvents.YIELD:
398 return Promise.resolve({ value: undefined, done: true });
399 case channel_1.RpcEvents.RETURN:
400 return Promise.resolve({ value: args[0], done: true });
401 case channel_1.RpcEvents.THROW:
402 return Promise.reject(args[0]);
403 }
404 }
405 else {
406 if (this.status === "uninitiated" && event !== channel_1.RpcEvents.INVOKE) {
407 this.client["send"](event, this.taskId, this.modName, this.method, [...this.args], ...args);
408 }
409 else {
410 this.client["send"](event, this.taskId, this.modName, this.method, ...args);
411 }
412 this.status = "suspended";
413 try {
414 let res = await this.prepareTask(event, args[0]);
415 if (event !== channel_1.RpcEvents.INVOKE) {
416 ("value" in res) || (res.value = void 0);
417 if (res.done) {
418 this.status = "closed";
419 this.result = res.value;
420 this.client["tasks"].delete(this.taskId);
421 }
422 }
423 return res;
424 }
425 catch (err) {
426 this.status = "closed";
427 this.client["tasks"].delete(this.taskId);
428 throw err;
429 }
430 }
431 }
432}
433//# sourceMappingURL=client.js.map
\No newline at end of file