UNPKG

10.2 kBJavaScriptView Raw
1"use strict";
2var __importStar = (this && this.__importStar) || function (mod) {
3 if (mod && mod.__esModule) return mod;
4 var result = {};
5 if (mod != null) for (var k in mod) if (Object.hasOwnProperty.call(mod, k)) result[k] = mod[k];
6 result["default"] = mod;
7 return result;
8};
9Object.defineProperty(exports, "__esModule", { value: true });
10const net = __importStar(require("net"));
11const events_1 = require("events");
12const timers_1 = require("timers");
13const debug_1 = require("debug");
14const version_1 = require("./version");
15let Debug = debug_1.debug('server-libs');
16class TcpUnit extends events_1.EventEmitter {
17 constructor(sockadd, sock, host = '', port = 0, serverId = '') {
18 super();
19 this.sockaddr = '';
20 this.host = '';
21 this.port = 0;
22 this.retryCount = 0;
23 this.serverId = '';
24 this.recv_pool = [];
25 this.recv_len = 0;
26 // 这里host 是 ::ffff: 开头的要剔除掉
27 this.sockaddr = sockadd;
28 this.sock = sock;
29 this.host = host.replace('::ffff:', '');
30 this.port = port;
31 this.serverId = serverId;
32 if (version_1.V_UnderV8 && this.off == undefined) {
33 this.off = this.removeListener;
34 }
35 }
36 on(event, listener) {
37 super.on(event, listener);
38 return this;
39 }
40 reciveData(data) {
41 this.recv_pool.push(data);
42 this.recv_len += data.length;
43 this.doParse();
44 }
45 resetData(data) {
46 this.recv_pool.unshift(data);
47 this.recv_len += data.length;
48 }
49 process(cmd, info) {
50 switch (cmd) {
51 case 0:
52 case 1:
53 Debug(info.toString());
54 break;
55 }
56 // 这里如果存在主节点,那么提交给主节点处理,也可以自己处理
57 if (!this.serverId || TcpServer.isDispersedMode(this.serverId)) {
58 this.emit('data', cmd, info);
59 }
60 else {
61 TcpServer.serPools[this.serverId].emit('data', this.sockaddr, cmd, info);
62 }
63 }
64 doParse() {
65 let pool_buff;
66 let currlen = 0;
67 if (this.recv_pool.length == 1) {
68 pool_buff = this.recv_pool.shift();
69 }
70 else if (this.recv_pool.length > 1) {
71 pool_buff = Buffer.alloc(this.recv_len);
72 while (this.recv_pool.length) {
73 let cache_buff = this.recv_pool.shift();
74 if (!cache_buff)
75 break;
76 cache_buff.copy(pool_buff, currlen, 0, cache_buff.length);
77 currlen += cache_buff.length;
78 }
79 }
80 else {
81 return;
82 }
83 if (pool_buff.length < 2) {
84 // 数据不足,需要放到下一个里面合成一个数据块
85 this.recv_pool.unshift(pool_buff);
86 }
87 else {
88 let len = pool_buff.readInt16BE(0);
89 if (pool_buff.length < len) {
90 // 数据不足,需要放到下一个里面合成一个数据块
91 this.recv_pool.unshift(pool_buff);
92 }
93 else {
94 // 处理数据
95 this.process(pool_buff.readInt16BE(2), pool_buff.slice(4, len));
96 if (len < pool_buff.length) {
97 // 数据不足,需要放到下一个里面合成一个数据块
98 this.recv_pool.unshift(pool_buff.slice(len, pool_buff.length));
99 }
100 // 再处理一下
101 this.doParse();
102 }
103 }
104 }
105 sendMsg(cmd, msg) {
106 if (typeof msg == 'string') {
107 msg = Buffer.from(msg);
108 }
109 let datalen = msg.length + 4;
110 let dataBuff = Buffer.alloc(datalen);
111 dataBuff.writeInt16BE(datalen, 0);
112 dataBuff.writeInt16BE(cmd, 2);
113 msg.copy(dataBuff, 4, 0, msg.length);
114 return this.sock ? this.sock.write(dataBuff) : false;
115 }
116 close() {
117 this.sock && this.sock.end();
118 this.emit('close', this.sockaddr, false);
119 }
120}
121exports.TcpUnit = TcpUnit;
122/**tcp 连接的管理,负责tcp连接的响应和处理 */
123class TcpServer extends events_1.EventEmitter {
124 constructor(port) {
125 super();
126 this.netPools = {};
127 this.isDispersedMode = true;
128 this.sockaddr = 'localhost:' + port;
129 let tcp_server = net.createServer();
130 tcp_server.listen(port);
131 tcp_server.on("connection", this.onconnection.bind(this));
132 tcp_server.on('error', (error) => { this.emit('error', error); });
133 TcpServer.serPools[this.sockaddr] = this;
134 }
135 // 创建一个服务器
136 static create(port) {
137 return new TcpServer(port);
138 }
139 // 获取服务器节点
140 static getServer(sockaddr) {
141 return this.serPools[sockaddr];
142 }
143 static isDispersedMode(sockaddr) {
144 if (this.serPools.hasOwnProperty(sockaddr)) {
145 return this.serPools[sockaddr].isDispersedMode;
146 }
147 return true;
148 }
149 // 获取链接节点
150 getUnit(sockaddr) {
151 return this.netPools[sockaddr];
152 }
153 on(event, listener) {
154 if (event == 'data')
155 this.isDispersedMode = false;
156 super.on(event, listener);
157 return this;
158 }
159 onconnection(sock) {
160 let sockaddr = sock.remoteAddress + ':' + sock.remotePort;
161 sock.on('data', this.onData.bind(this, sockaddr));
162 sock.on('close', this.onClose.bind(this, sockaddr));
163 sock.on('error', this.onError.bind(this, sockaddr));
164 this.netPools[sockaddr] = new TcpUnit(sockaddr, sock, sock.remoteAddress, sock.remotePort, this.sockaddr);
165 this.emit("connection", sockaddr);
166 // this.pushMsg(sockadd, 1, "your host is" + sock.remoteAddress);
167 // this.pushMsg(sockadd, 2, "your port is" + sock.remotePort);
168 // 这里发起一个connect 测试一下conncet转listen的成果
169 }
170 pushMsg(sockaddr, cmd, msg) {
171 let sockUnit = this.netPools[sockaddr];
172 if (sockUnit) {
173 return sockUnit.sendMsg(cmd, msg);
174 }
175 return false;
176 }
177 onData(sockaddr, data) {
178 let sockUnit = this.netPools[sockaddr];
179 if (sockUnit) {
180 sockUnit.reciveData(data);
181 }
182 }
183 onClose(sockaddr) {
184 if (this.netPools.hasOwnProperty(sockaddr)) {
185 delete this.netPools[sockaddr];
186 this.emit("close", sockaddr);
187 }
188 }
189 onError(sockaddr) {
190 if (this.netPools.hasOwnProperty(sockaddr)) {
191 delete this.netPools[sockaddr];
192 this.emit("close", sockaddr);
193 }
194 }
195 close() {
196 // 这里要干掉所有的链接
197 let pools = Object.keys(this.netPools);
198 for (let i = 0; i < pools.length; i++) {
199 let net = this.netPools[pools[i]];
200 if (net)
201 net.close();
202 }
203 }
204}
205exports.TcpServer = TcpServer;
206TcpServer.serPools = {};
207// 链接socket
208class TcpSocket {
209 static create(host, port, count) {
210 let sockaddr = host + ':' + port;
211 if (count == null || count == undefined || count == 0)
212 count = 1;
213 let sockUnit = new TcpUnit(sockaddr, undefined, host, port);
214 return sockUnit;
215 // 连接成功
216 // Debug("succ:" + host + ':' + port);
217 // this.sendMsg(sock, 1, "欢迎实现connect&listen相同端口:" + port);
218 }
219 static _createSocket(sockUnit) {
220 let sock = net.createConnection(sockUnit.port, sockUnit.host);
221 sock.once("connect", () => {
222 sockUnit.retryCount = 0;
223 sockUnit.emit("connection");
224 });
225 sock.on("close", (e) => { sockUnit.emit('close', sockUnit.sockaddr, e); });
226 sock.on("data", (data) => { sockUnit.reciveData(data); });
227 sock.on("error", (e) => {
228 sockUnit.emit('error', sockUnit.sockaddr, e);
229 });
230 sockUnit.sock = sock;
231 }
232 static connect(sockUnit) {
233 if (sockUnit.sock) {
234 sockUnit.sock.removeAllListeners();
235 sockUnit.removeAllListeners();
236 }
237 let waitTime = 10 * 1000;
238 if (sockUnit.retryCount < 5) {
239 waitTime = 1000;
240 }
241 else if (sockUnit.retryCount < 10) {
242 waitTime = 2 * 1000;
243 }
244 else if (sockUnit.retryCount < 20) {
245 waitTime = 5 * 1000;
246 }
247 sockUnit.retryCount++;
248 // 重连机制这里添加一个时间间隔
249 timers_1.setTimeout(function () { TcpSocket._createSocket(sockUnit); }, waitTime);
250 return new Promise(function (resolve, reject) {
251 function onError(sockaddr, e) {
252 sockUnit.off('connection', onConnection);
253 reject([sockaddr, e]);
254 }
255 function onConnection(addr) {
256 sockUnit.off('error', onError);
257 resolve([addr]);
258 }
259 sockUnit.once('connection', onConnection);
260 sockUnit.once('error', onError);
261 });
262 }
263 static reConnect(sockUnit) {
264 if (sockUnit.sock) {
265 // sockUnit.sock.removeAllListeners('connect');
266 // sockUnit.sock.removeAllListeners('close');
267 // sockUnit.sock.removeAllListeners('data');
268 // sockUnit.sock.removeAllListeners('error');
269 sockUnit.sock.removeAllListeners();
270 }
271 let waitTime = 10 * 1000;
272 if (sockUnit.retryCount < 5) {
273 waitTime = 1000;
274 }
275 else if (sockUnit.retryCount < 10) {
276 waitTime = 2 * 1000;
277 }
278 else if (sockUnit.retryCount < 20) {
279 waitTime = 5 * 1000;
280 }
281 sockUnit.retryCount++;
282 // 重连机制这里添加一个时间间隔
283 timers_1.setTimeout(function () { TcpSocket._createSocket(sockUnit); }, waitTime);
284 }
285}
286exports.TcpSocket = TcpSocket;