UNPKG

3.39 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.ClientTCP = void 0;
4const common_1 = require("@nestjs/common");
5const net = require("net");
6const rxjs_1 = require("rxjs");
7const operators_1 = require("rxjs/operators");
8const constants_1 = require("../constants");
9const json_socket_1 = require("../helpers/json-socket");
10const client_proxy_1 = require("./client-proxy");
11class ClientTCP extends client_proxy_1.ClientProxy {
12 constructor(options) {
13 super();
14 this.logger = new common_1.Logger(ClientTCP.name);
15 this.isConnected = false;
16 this.port = this.getOptionsProp(options, 'port') || constants_1.TCP_DEFAULT_PORT;
17 this.host = this.getOptionsProp(options, 'host') || constants_1.TCP_DEFAULT_HOST;
18 this.initializeSerializer(options);
19 this.initializeDeserializer(options);
20 }
21 connect() {
22 if (this.isConnected && this.connection) {
23 return this.connection;
24 }
25 this.socket = this.createSocket();
26 this.bindEvents(this.socket);
27 const source$ = this.connect$(this.socket.netSocket).pipe(operators_1.tap(() => {
28 this.isConnected = true;
29 this.socket.on(constants_1.MESSAGE_EVENT, (buffer) => this.handleResponse(buffer));
30 }), operators_1.share());
31 this.socket.connect(this.port, this.host);
32 this.connection = rxjs_1.lastValueFrom(source$).catch(err => {
33 if (err instanceof rxjs_1.EmptyError) {
34 return;
35 }
36 throw err;
37 });
38 return this.connection;
39 }
40 async handleResponse(buffer) {
41 const { err, response, isDisposed, id } = await this.deserializer.deserialize(buffer);
42 const callback = this.routingMap.get(id);
43 if (!callback) {
44 return undefined;
45 }
46 if (isDisposed || err) {
47 return callback({
48 err,
49 response,
50 isDisposed: true,
51 });
52 }
53 callback({
54 err,
55 response,
56 });
57 }
58 createSocket() {
59 return new json_socket_1.JsonSocket(new net.Socket());
60 }
61 close() {
62 this.socket && this.socket.end();
63 this.handleClose();
64 }
65 bindEvents(socket) {
66 socket.on(constants_1.ERROR_EVENT, (err) => err.code !== constants_1.ECONNREFUSED && this.handleError(err));
67 socket.on(constants_1.CLOSE_EVENT, () => this.handleClose());
68 }
69 handleError(err) {
70 this.logger.error(err);
71 }
72 handleClose() {
73 this.isConnected = false;
74 this.socket = null;
75 }
76 publish(partialPacket, callback) {
77 try {
78 const packet = this.assignPacketId(partialPacket);
79 const serializedPacket = this.serializer.serialize(packet);
80 this.routingMap.set(packet.id, callback);
81 this.socket.sendMessage(serializedPacket);
82 return () => this.routingMap.delete(packet.id);
83 }
84 catch (err) {
85 callback({ err });
86 }
87 }
88 async dispatchEvent(packet) {
89 const pattern = this.normalizePattern(packet.pattern);
90 const serializedPacket = this.serializer.serialize(Object.assign(Object.assign({}, packet), { pattern }));
91 return this.socket.sendMessage(serializedPacket);
92 }
93}
94exports.ClientTCP = ClientTCP;