UNPKG

3.86 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ServerTCP = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const net = require("net");
6const constants_1 = require("../constants");
7const tcp_context_1 = require("../ctx-host/tcp.context");
8const enums_1 = require("../enums");
9const helpers_1 = require("../helpers");
10const server_1 = require("./server");
11class ServerTCP extends server_1.Server {
12 constructor(options) {
13 super();
14 this.options = options;
15 this.transportId = enums_1.Transport.TCP;
16 this.isExplicitlyTerminated = false;
17 this.retryAttemptsCount = 0;
18 this.port = this.getOptionsProp(options, 'port') || constants_1.TCP_DEFAULT_PORT;
19 this.host = this.getOptionsProp(options, 'host') || constants_1.TCP_DEFAULT_HOST;
20 this.socketClass =
21 this.getOptionsProp(options, 'socketClass') || helpers_1.JsonSocket;
22 this.init();
23 this.initializeSerializer(options);
24 this.initializeDeserializer(options);
25 }
26 listen(callback) {
27 this.server.once(constants_1.ERROR_EVENT, (err) => {
28 if ((err === null || err === void 0 ? void 0 : err.code) === constants_1.EADDRINUSE || (err === null || err === void 0 ? void 0 : err.code) === constants_1.ECONNREFUSED) {
29 return callback(err);
30 }
31 });
32 this.server.listen(this.port, this.host, callback);
33 }
34 close() {
35 this.isExplicitlyTerminated = true;
36 this.server.close();
37 }
38 bindHandler(socket) {
39 const readSocket = this.getSocketInstance(socket);
40 readSocket.on(constants_1.MESSAGE_EVENT, async (msg) => this.handleMessage(readSocket, msg));
41 readSocket.on(constants_1.ERROR_EVENT, this.handleError.bind(this));
42 }
43 async handleMessage(socket, rawMessage) {
44 const packet = await this.deserializer.deserialize(rawMessage);
45 const pattern = !(0, shared_utils_1.isString)(packet.pattern)
46 ? JSON.stringify(packet.pattern)
47 : packet.pattern;
48 const tcpContext = new tcp_context_1.TcpContext([socket, pattern]);
49 if ((0, shared_utils_1.isUndefined)(packet.id)) {
50 return this.handleEvent(pattern, packet, tcpContext);
51 }
52 const handler = this.getHandlerByPattern(pattern);
53 if (!handler) {
54 const status = 'error';
55 const noHandlerPacket = this.serializer.serialize({
56 id: packet.id,
57 status,
58 err: constants_1.NO_MESSAGE_HANDLER,
59 });
60 return socket.sendMessage(noHandlerPacket);
61 }
62 const response$ = this.transformToObservable(await handler(packet.data, tcpContext));
63 response$ &&
64 this.send(response$, data => {
65 Object.assign(data, { id: packet.id });
66 const outgoingResponse = this.serializer.serialize(data);
67 socket.sendMessage(outgoingResponse);
68 });
69 }
70 handleClose() {
71 if (this.isExplicitlyTerminated ||
72 !this.getOptionsProp(this.options, 'retryAttempts') ||
73 this.retryAttemptsCount >=
74 this.getOptionsProp(this.options, 'retryAttempts')) {
75 return undefined;
76 }
77 ++this.retryAttemptsCount;
78 return setTimeout(() => this.server.listen(this.port, this.host), this.getOptionsProp(this.options, 'retryDelay') || 0);
79 }
80 init() {
81 this.server = net.createServer(this.bindHandler.bind(this));
82 this.server.on(constants_1.ERROR_EVENT, this.handleError.bind(this));
83 this.server.on(constants_1.CLOSE_EVENT, this.handleClose.bind(this));
84 }
85 getSocketInstance(socket) {
86 return new this.socketClass(socket);
87 }
88}
89exports.ServerTCP = ServerTCP;