UNPKG

21.6 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3const tslib_1 = require("tslib");
4const net = require("net");
5const path = require("path");
6const fs = require("fs-extra");
7const bsp_1 = require("bsp");
8const advanced_collections_1 = require("advanced-collections");
9const isSocketResetError = require("is-socket-reset-error");
10const sleep = require("sleep-promise");
11const sequid_1 = require("sequid");
12const check_iterable_1 = require("check-iterable");
13const thenable_generator_1 = require("thenable-generator");
14const util_1 = require("./util");
15const authorized = Symbol("authorized");
16const lastActiveTime = Symbol("lastActiveTime");
17var RpcEvents;
18(function (RpcEvents) {
19 RpcEvents[RpcEvents["HANDSHAKE"] = 0] = "HANDSHAKE";
20 RpcEvents[RpcEvents["CONNECT"] = 1] = "CONNECT";
21 RpcEvents[RpcEvents["BROADCAST"] = 2] = "BROADCAST";
22 RpcEvents[RpcEvents["INVOKE"] = 3] = "INVOKE";
23 RpcEvents[RpcEvents["RETURN"] = 4] = "RETURN";
24 RpcEvents[RpcEvents["YIELD"] = 5] = "YIELD";
25 RpcEvents[RpcEvents["THROW"] = 6] = "THROW";
26 RpcEvents[RpcEvents["PING"] = 7] = "PING";
27 RpcEvents[RpcEvents["PONG"] = 8] = "PONG";
28})(RpcEvents || (RpcEvents = {}));
29class RpcChannel {
30 constructor(options, host) {
31 this.host = "0.0.0.0";
32 this.port = 9000;
33 this.path = "";
34 this.timeout = 5000;
35 this.pingTimeout = 1000 * 30;
36 if (typeof options === "object") {
37 Object.assign(this, options);
38 }
39 else if (typeof options === "number") {
40 Object.assign(this, { host, port: options });
41 }
42 else {
43 this.path = util_1.absPath(options);
44 }
45 }
46 get dsn() {
47 let dsn = this.path ? "ipc://" : "rpc://";
48 if (this.path) {
49 dsn += this.path;
50 }
51 else if (this.port) {
52 if (this.host) {
53 dsn += this.host + ":";
54 }
55 dsn += this.port;
56 }
57 return dsn;
58 }
59 onError(handler) {
60 this.errorHandler = handler;
61 }
62 static registerError(ctor) {
63 util_1.Errors[ctor.name] = ctor;
64 }
65}
66exports.RpcChannel = RpcChannel;
67class RpcServer extends RpcChannel {
68 constructor() {
69 super(...arguments);
70 this.registry = {};
71 this.clients = new advanced_collections_1.BiMap();
72 this.suspendedTasks = new Map();
73 this.gcTimer = setInterval(() => {
74 let now = Date.now();
75 let timeout = this.pingTimeout + 5;
76 for (let [, socket] of this.clients) {
77 if (now - socket[lastActiveTime] > timeout) {
78 socket.destroy();
79 }
80 }
81 }, this.timeout);
82 }
83 open() {
84 return new Promise((resolve, reject) => tslib_1.__awaiter(this, void 0, void 0, function* () {
85 let server = this.server = net.createServer();
86 let listener = () => {
87 resolve(this);
88 server.on("error", err => {
89 this.errorHandler && this.errorHandler.call(this, err);
90 });
91 };
92 if (this.path) {
93 yield fs.ensureDir(path.dirname(this.path));
94 if (yield fs.pathExists(this.path)) {
95 yield fs.unlink(this.path);
96 }
97 server.listen(util_1.absPath(this.path, true), listener);
98 }
99 else if (this.host) {
100 server.listen(this.port, this.host, listener);
101 }
102 else {
103 server.listen(this.port, listener);
104 }
105 server.once("error", reject)
106 .on("connection", this.handleConnection.bind(this));
107 }));
108 }
109 close() {
110 return new Promise(resolve => {
111 if (this.server) {
112 clearInterval(this.gcTimer);
113 this.server.unref();
114 this.server.close(() => resolve(this));
115 }
116 else {
117 resolve(this);
118 }
119 });
120 }
121 register(mod) {
122 this.registry[mod.name] = mod;
123 return this;
124 }
125 publish(event, data, clients) {
126 let sent = false;
127 let socket;
128 let targets = clients || this.clients.keys();
129 for (let id of targets) {
130 if (socket = this.clients.get(id)) {
131 this.dispatch(socket, RpcEvents.BROADCAST, event, data);
132 sent = true;
133 }
134 }
135 return sent;
136 }
137 getClients() {
138 let clients = [];
139 let now = Date.now();
140 for (let [id, socket] of this.clients) {
141 if (now - socket[lastActiveTime] <= this.pingTimeout) {
142 clients.push(id);
143 }
144 }
145 return clients;
146 }
147 dispatch(socket, ...data) {
148 if (!socket.destroyed && socket.writable) {
149 socket.write(bsp_1.send(...data));
150 }
151 }
152 handleConnection(socket) {
153 let temp = [];
154 socket.on("error", err => {
155 if (!isSocketResetError(err) && this.errorHandler) {
156 this.errorHandler(err);
157 }
158 }).on("end", () => {
159 socket.emit("close", false);
160 }).on("close", () => {
161 this.clients.deleteValue(socket);
162 let tasks = this.suspendedTasks.get(socket);
163 this.suspendedTasks.delete(socket);
164 for (let id in tasks) {
165 tasks[id].return();
166 }
167 }).on("data", (buf) => tslib_1.__awaiter(this, void 0, void 0, function* () {
168 if (!socket[authorized]) {
169 if (this.secret) {
170 let index = buf.indexOf("\r\n");
171 let secret = buf.slice(0, index).toString();
172 if (secret !== this.secret) {
173 return socket.destroy();
174 }
175 else {
176 buf = buf.slice(index + 2);
177 }
178 }
179 socket[authorized] = true;
180 }
181 socket[lastActiveTime] = Date.now();
182 let msg = bsp_1.receive(buf, temp);
183 for (let [event, taskId, name, method, ...args] of msg) {
184 switch (event) {
185 case RpcEvents.HANDSHAKE:
186 this.clients.set(taskId, socket);
187 this.suspendedTasks.set(socket, {});
188 this.dispatch(socket, RpcEvents.CONNECT);
189 break;
190 case RpcEvents.PING:
191 this.dispatch(socket, RpcEvents.PONG);
192 break;
193 case RpcEvents.INVOKE:
194 {
195 let data;
196 let tasks = this.suspendedTasks.get(socket) || {};
197 try {
198 let ins = this.registry[name].instance(util_1.local);
199 let task = ins[method].apply(ins, args);
200 if (task && check_iterable_1.isIteratorLike(task[thenable_generator_1.source])) {
201 tasks[taskId] = task;
202 event = RpcEvents.INVOKE;
203 }
204 else {
205 data = yield task;
206 event = RpcEvents.RETURN;
207 }
208 }
209 catch (err) {
210 event = RpcEvents.THROW;
211 data = util_1.err2obj(err);
212 }
213 this.dispatch(socket, event, taskId, data);
214 }
215 break;
216 case RpcEvents.YIELD:
217 case RpcEvents.RETURN:
218 case RpcEvents.THROW:
219 {
220 let data, input;
221 let tasks = this.suspendedTasks.get(socket) || {};
222 let task = tasks[taskId];
223 try {
224 if (!task) {
225 throw new ReferenceError(`task (${taskId}) doesn't exist`);
226 }
227 else {
228 input = args[0];
229 }
230 if (event === RpcEvents.YIELD) {
231 data = yield task.next(input);
232 }
233 else if (event === RpcEvents.RETURN) {
234 data = yield task.return(input);
235 }
236 else {
237 yield task.throw(input);
238 }
239 data.done && (delete tasks[taskId]);
240 }
241 catch (err) {
242 event = RpcEvents.THROW;
243 data = util_1.err2obj(err);
244 task && (delete tasks[taskId]);
245 }
246 this.dispatch(socket, event, taskId, data);
247 }
248 break;
249 }
250 }
251 }));
252 }
253}
254exports.RpcServer = RpcServer;
255class RpcClient extends RpcChannel {
256 constructor(options, host) {
257 super(options, host);
258 this.connecting = false;
259 this.connected = false;
260 this.closed = false;
261 this.socket = null;
262 this.initiated = false;
263 this.registry = {};
264 this.temp = [];
265 this.taskId = sequid_1.default(0, true);
266 this.tasks = {};
267 this.events = {};
268 this.finishConnect = null;
269 this.selfDestruction = null;
270 this.pingTimer = setInterval(() => {
271 this.selfDestruction = setTimeout(() => {
272 this.socket.destroy();
273 }, this.timeout);
274 this.send(RpcEvents.PING, this.id);
275 }, this.pingTimeout);
276 this.id = this.id || Math.random().toString(16).slice(2);
277 }
278 open() {
279 return new Promise((resolve, reject) => {
280 if ((this.socket && this.socket.connecting)
281 || this.connected || this.closed) {
282 return resolve(this);
283 }
284 this.connecting = true;
285 let connectListener = () => {
286 this.initiated = true;
287 this.connecting = false;
288 this.socket.removeListener("error", errorListener);
289 this.finishConnect = () => {
290 this.connected = true;
291 resolve(this);
292 };
293 this.socket.write((this.secret || "") + "\r\n", () => {
294 this.send(RpcEvents.HANDSHAKE, this.id);
295 });
296 };
297 let errorListener = (err) => {
298 this.connecting = false;
299 this.socket.removeListener("connect", connectListener);
300 if (this.initiated) {
301 resolve(this);
302 }
303 else {
304 reject(err);
305 }
306 };
307 if (this.path) {
308 this.socket = net.createConnection(this.path, connectListener);
309 }
310 else {
311 this.socket = net.createConnection(this.port, this.host, connectListener);
312 }
313 this.socket.once("error", errorListener);
314 this.prepareChannel();
315 });
316 }
317 close() {
318 return new Promise(resolve => {
319 clearInterval(this.pingTimer);
320 clearTimeout(this.selfDestruction);
321 this.closed = true;
322 this.connected = false;
323 this.connecting = false;
324 this.pause();
325 if (this.socket) {
326 this.socket.unref();
327 this.socket.end();
328 resolve(this);
329 }
330 else {
331 resolve(this);
332 }
333 });
334 }
335 register(mod) {
336 this.registry[mod.name] = mod;
337 mod[util_1.remotized] = true;
338 mod["remoteSingletons"][this.dsn] = util_1.createRemoteInstance(mod, (prop) => {
339 return this.createFunction(mod.name, prop);
340 });
341 return this;
342 }
343 pause() {
344 let { dsn } = this;
345 let success = false;
346 for (let name in this.registry) {
347 let instances = this.registry[name]["remoteSingletons"];
348 delete instances[dsn];
349 success = true;
350 }
351 return success;
352 }
353 resume() {
354 let { dsn } = this;
355 let success = false;
356 for (let name in this.registry) {
357 let instances = this.registry[name]["remoteSingletons"];
358 if (!instances[dsn]) {
359 this.register(this.registry[name]);
360 success = true;
361 }
362 }
363 return success;
364 }
365 subscribe(event, listener) {
366 if (!this.events[event]) {
367 this.events[event] = [];
368 }
369 this.events[event].push(listener);
370 return this;
371 }
372 unsubscribe(event, listener) {
373 if (!listener) {
374 return this.events[event] ? (delete this.events[event]) : false;
375 }
376 else if (this.events[event]) {
377 let i = this.events[event].indexOf(listener);
378 return this.events[event].splice(i, 1).length > 0;
379 }
380 else {
381 return false;
382 }
383 }
384 send(...data) {
385 if (this.socket && !this.socket.destroyed && this.socket.writable) {
386 if (data[data.length - 1] === undefined) {
387 data.pop();
388 }
389 this.socket.write(bsp_1.send(...data));
390 }
391 }
392 reconnect(timeout = 0) {
393 return tslib_1.__awaiter(this, void 0, void 0, function* () {
394 if (this.connected || this.connecting)
395 return;
396 try {
397 this.connecting = true;
398 timeout && (yield sleep(timeout));
399 yield this.open();
400 }
401 catch (e) { }
402 if (this.connected) {
403 this.resume();
404 }
405 });
406 }
407 createFunction(name, method) {
408 let self = this;
409 return function (...args) {
410 return new thenable_generator_1.ThenableAsyncGenerator(new ThenableIteratorProxy(self, name, method, ...args));
411 };
412 }
413 prepareChannel() {
414 this.socket.on("error", err => {
415 if (this.connected &&
416 !isSocketResetError(err) && this.errorHandler) {
417 this.errorHandler(err);
418 }
419 }).on("end", () => {
420 if (!this.connecting && this.socket.destroyed) {
421 this.socket.emit("close", false);
422 }
423 }).on("close", hadError => {
424 this.connected = false;
425 this.pause();
426 if (!this.closed && !this.connecting && this.initiated) {
427 this.reconnect(hadError ? this.timeout : 0);
428 }
429 }).on("data", (buf) => tslib_1.__awaiter(this, void 0, void 0, function* () {
430 let msg = bsp_1.receive(buf, this.temp);
431 for (let [event, taskId, data] of msg) {
432 let task;
433 switch (event) {
434 case RpcEvents.CONNECT:
435 this.finishConnect();
436 break;
437 case RpcEvents.BROADCAST:
438 let listeners = this.events[taskId] || [];
439 for (let handle of listeners) {
440 yield handle(data);
441 }
442 break;
443 case RpcEvents.INVOKE:
444 case RpcEvents.YIELD:
445 case RpcEvents.RETURN:
446 if (task = this.tasks[taskId]) {
447 task.resolve(data);
448 }
449 break;
450 case RpcEvents.THROW:
451 if (task = this.tasks[taskId]) {
452 task.reject(util_1.obj2err(data));
453 }
454 break;
455 case RpcEvents.PONG:
456 clearTimeout(this.selfDestruction);
457 this.selfDestruction = null;
458 break;
459 }
460 }
461 }));
462 return this;
463 }
464}
465exports.RpcClient = RpcClient;
466class ThenableIteratorProxy {
467 constructor(client, name, method, ...args) {
468 this.client = client;
469 this.name = name;
470 this.method = method;
471 this.taskId = this.client["taskId"].next().value;
472 this.queue = [];
473 this.status = "uninitiated";
474 this.args = args;
475 this.result = this.invokeTask(RpcEvents.INVOKE, ...this.args);
476 }
477 next(value) {
478 return this.invokeTask(RpcEvents.YIELD, value);
479 }
480 return(value) {
481 return this.invokeTask(RpcEvents.RETURN, value);
482 }
483 throw(err) {
484 return this.invokeTask(RpcEvents.THROW, util_1.err2obj(err));
485 }
486 then(resolver, rejecter) {
487 return Promise.resolve(this.result).then((res) => {
488 this.status = "closed";
489 this.result = res;
490 delete this.client["tasks"][this.taskId];
491 return res;
492 }).then(resolver, rejecter);
493 }
494 close() {
495 this.status = "closed";
496 for (let task of this.queue) {
497 switch (task.event) {
498 case RpcEvents.INVOKE:
499 task.resolve(void 0);
500 break;
501 case RpcEvents.YIELD:
502 task.resolve({ value: void 0, done: true });
503 break;
504 case RpcEvents.RETURN:
505 task.resolve({ value: task.data, done: true });
506 break;
507 case RpcEvents.THROW:
508 task.reject(task.data);
509 break;
510 }
511 }
512 this.queue = [];
513 }
514 createTimeout() {
515 return setTimeout(() => {
516 let num = Math.round(this.client.timeout / 1000);
517 let unit = num === 1 ? "second" : "seconds";
518 if (this.queue.length > 0) {
519 this.queue.shift().reject(new Error(`RPC request timeout after ${num} ${unit}`));
520 }
521 this.close();
522 }, this.client.timeout);
523 }
524 prepareTask(event, data) {
525 let task = this.client["tasks"][this.taskId];
526 if (!task) {
527 task = this.client["tasks"][this.taskId] = {
528 resolve: (data) => {
529 if (this.status === "suspended") {
530 if (this.queue.length > 0) {
531 this.queue.shift().resolve(data);
532 }
533 }
534 },
535 reject: (err) => {
536 if (this.status === "suspended") {
537 if (this.queue.length > 0) {
538 this.queue.shift().reject(err);
539 }
540 this.close();
541 }
542 }
543 };
544 }
545 return new Promise((resolve, reject) => {
546 let timer = this.createTimeout();
547 this.queue.push({
548 event,
549 data,
550 resolve: (data) => {
551 clearTimeout(timer);
552 resolve(data);
553 },
554 reject: (err) => {
555 clearTimeout(timer);
556 reject(err);
557 }
558 });
559 });
560 }
561 invokeTask(event, ...args) {
562 if (this.status === "closed") {
563 switch (event) {
564 case RpcEvents.INVOKE:
565 return Promise.resolve(this.result);
566 case RpcEvents.YIELD:
567 return Promise.resolve({ value: undefined, done: true });
568 case RpcEvents.RETURN:
569 return Promise.resolve({ value: args[0], done: true });
570 case RpcEvents.THROW:
571 return Promise.reject(util_1.obj2err(args[0]));
572 }
573 }
574 else {
575 if (this.status === "uninitiated" && event !== RpcEvents.INVOKE) {
576 this.client.send(event, this.taskId, this.name, this.method, [...this.args], ...args);
577 }
578 else {
579 this.client.send(event, this.taskId, this.name, this.method, ...args);
580 }
581 this.status = "suspended";
582 return this.prepareTask(event, args[0]).then(res => {
583 if (event !== RpcEvents.INVOKE) {
584 ("value" in res) || (res.value = void 0);
585 if (res.done) {
586 this.status = "closed";
587 this.result = res.value;
588 delete this.client["tasks"][this.taskId];
589 }
590 }
591 return res;
592 }).catch(err => {
593 this.status = "closed";
594 delete this.client["tasks"][this.taskId];
595 throw err;
596 });
597 }
598 }
599}
600//# sourceMappingURL=rpc.js.map
\No newline at end of file