UNPKG

3.91 kBJavaScriptView Raw
1var
2 Socket = require('./socket'),
3 TCPMessage = require('./tcp_message'),
4 net = require('net'),
5 dgram = require('dgram');
6
7var SocketCache = module.exports = function (parent) {
8 this._pending = {};
9 this._socket = {};
10 this._parent = parent;
11};
12
13SocketCache.prototype._hash = function (server) {
14 if (server.type === 'tcp')
15 return server.address + ':' + server.port;
16 else
17 return 'udp' + net.isIP(server.address);
18};
19
20SocketCache.prototype._getPending = function (server) {
21 var name = this._hash(server);
22 return this._pending[name];
23};
24
25SocketCache.prototype._pendingAdd = function (server, cb) {
26 var name = this._hash(server);
27 if (!this._pending[name]) {
28 this._pending[name] = [];
29 }
30 this._pending[name].push(cb);
31};
32
33SocketCache.prototype._pendingRemove = function (server) {
34 var name = this._hash(server);
35 delete this._pending[name];
36};
37
38SocketCache.prototype._toInternalSocket = function (server, socket) {
39 var S;
40
41 if (server.type === 'tcp') {
42 S = new Socket(null, socket);
43 } else {
44 S = new Socket(socket, server);
45 }
46
47 return S;
48};
49
50SocketCache.prototype._pendingEmit = function (server, socket) {
51 var S, pending, self = this;
52 pending = this._getPending(server);
53 if (pending) {
54 self._socketAdd(server, socket);
55 this._pendingRemove(server);
56 S = this._toInternalSocket(server, socket);
57 pending.forEach(function (cb) {
58 cb(S);
59 });
60 }
61};
62
63SocketCache.prototype._getSocket = function (server) {
64 var name = this._hash(server);
65 return this._socket[name];
66};
67
68SocketCache.prototype._socketRemoveInternal = function (shash, socket) {
69 if (socket) {
70 delete this._socket[shash];
71 if (socket.socket.end) {
72 socket.socket.end();
73 } else {
74 socket.socket.close();
75 }
76 }
77};
78
79SocketCache.prototype._socketRemove = function (server) {
80 var cache_name = this._hash(server);
81 var socket = this._getSocket(server);
82 this._socketRemoveInternal(cache_name, socket);
83};
84
85SocketCache.prototype._socketAdd = function (server, socket) {
86 var self = this;
87 var cache_name = this._hash(server);
88 this._socket[cache_name] = {
89 last: new Date().getTime(),
90 socket: socket,
91 };
92};
93
94SocketCache.prototype._createTcp = function (server) {
95 var socket, self = this, tcp;
96 socket = net.connect(server.port, server.address);
97
98 socket.on('timeout', function () {
99 self._pendingRemove(server);
100 self._socketRemove(server);
101 });
102
103 socket.on('close', function () {
104 self._pendingRemove(server);
105 self._socketRemove(server);
106 });
107
108 socket.on('connect', function () {
109 self._pendingEmit(server, socket);
110 });
111
112 tcp = new TCPMessage(socket, function (msg, socket) {
113 self._parent.handleMessage(server, msg, socket);
114 });
115};
116
117SocketCache.prototype._createUdp = function (server) {
118 var socket, self = this,
119 type = net.isIP(server.address);
120 if (type) {
121 socket = dgram.createSocket('udp' + type);
122 socket.on('message', function (msg, remote) {
123 self._parent.handleMessage(server, msg, new Socket(socket, remote));
124 });
125 socket.on('close', function () {
126 self._socketRemove(server);
127 });
128 socket.on('listening', function () {
129 //self._socketAdd(server, socket);
130 self._pendingEmit(server, socket);
131 });
132 socket.bind()
133 }
134};
135
136SocketCache.prototype.get = function (server, cb) {
137 var socket, pending, S;
138
139 socket = this._getSocket(server);
140 pending = this._getPending(server);
141
142 if (!socket) {
143 this._pendingAdd(server, cb);
144 if (!pending) {
145 if (server.type === 'tcp') {
146 this._createTcp(server);
147 } else {
148 this._createUdp(server);
149 }
150 }
151 } else {
152 socket.last = new Date().getTime();
153 S = this._toInternalSocket(server, socket.socket);
154 cb(S);
155 }
156};
157
158SocketCache.prototype.close = function (shash) {
159 var socket = this._socket[shash];
160 this._socketRemoveInternal(shash, socket);
161};