1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 | 'use strict';
|
22 |
|
23 | var net = require('net'),
|
24 | util = require('util'),
|
25 | EventEmitter = require('events').EventEmitter,
|
26 | Packet = require('./packet'),
|
27 | consts = require('./consts'),
|
28 | UDPSocket = require('./utils').UDPSocket,
|
29 | TCPSocket = require('./utils').TCPSocket;
|
30 |
|
31 | var debug = function() {
|
32 |
|
33 |
|
34 | };
|
35 |
|
36 | var SocketQueue = function(socket, server) {
|
37 | this._active = {};
|
38 | this._active_count = 0;
|
39 | this._pending = [];
|
40 |
|
41 | debug('created', server);
|
42 |
|
43 | this._server = server;
|
44 |
|
45 | this._socket = socket;
|
46 | this._socket.on('ready', this._onlisten.bind(this));
|
47 | this._socket.on('message', this._onmessage.bind(this));
|
48 | this._socket.on('close', this._onclose.bind(this));
|
49 | this._socket.bind(server);
|
50 |
|
51 | this._refd = true;
|
52 | };
|
53 | util.inherits(SocketQueue, EventEmitter);
|
54 |
|
55 | SocketQueue.prototype.send = function(request) {
|
56 | debug('added', request.question);
|
57 | this._pending.push(request);
|
58 | this._fill();
|
59 | };
|
60 |
|
61 | SocketQueue.prototype.remove = function(request) {
|
62 | var req = this._active[request.id];
|
63 | var idx = this._pending.indexOf(request);
|
64 |
|
65 | if (req) {
|
66 | delete this._active[request.id];
|
67 | this._active_count -= 1;
|
68 | this._fill();
|
69 | }
|
70 |
|
71 | if (idx > -1)
|
72 | this._pending.splice(idx, 1);
|
73 |
|
74 | this._unref();
|
75 | };
|
76 |
|
77 | SocketQueue.prototype.close = function() {
|
78 | debug('closing', this._server);
|
79 | this._socket.close();
|
80 | this._socket = undefined;
|
81 | this.emit('close');
|
82 | };
|
83 |
|
84 | SocketQueue.prototype._fill = function() {
|
85 | debug('pre fill, active:', this._active_count, 'pending:',
|
86 | this._pending.length);
|
87 |
|
88 | while (this._listening && this._pending.length &&
|
89 | this._active_count < 100) {
|
90 | this._dequeue();
|
91 | }
|
92 |
|
93 | debug('post fill, active:', this._active_count, 'pending:',
|
94 | this._pending.length);
|
95 | };
|
96 |
|
97 | var random_integer = function() {
|
98 | return Math.floor(Math.random() * 50000 + 1);
|
99 | };
|
100 |
|
101 | SocketQueue.prototype._dequeue = function() {
|
102 | var req = this._pending.pop();
|
103 | var id, packet, dnssocket;
|
104 |
|
105 | if (req) {
|
106 | id = random_integer();
|
107 |
|
108 | while (this._active[id])
|
109 | id = random_integer();
|
110 |
|
111 | debug('sending', req.question, id);
|
112 |
|
113 | req.id = id;
|
114 | this._active[id] = req;
|
115 | this._active_count += 1;
|
116 |
|
117 | try {
|
118 | packet = new Packet(this._socket.remote(req.server));
|
119 | packet.header.id = id;
|
120 | packet.header.rd = 1;
|
121 |
|
122 | if (req.try_edns) {
|
123 | packet.edns_version = 0;
|
124 |
|
125 |
|
126 | }
|
127 |
|
128 | packet.question.push(req.question);
|
129 | packet.send();
|
130 |
|
131 | this._ref();
|
132 | } catch (e) {
|
133 | req.error(e);
|
134 | }
|
135 | }
|
136 | };
|
137 |
|
138 | SocketQueue.prototype._onmessage = function(msg, remote) {
|
139 | var req;
|
140 |
|
141 | debug('got a message', this._server);
|
142 |
|
143 | try {
|
144 | var packet = Packet.parse(msg, remote);
|
145 | req = this._active[packet.header.id];
|
146 | debug('associated message', packet.header.id);
|
147 | } catch (e) {
|
148 | debug('error parsing packet', e);
|
149 | }
|
150 |
|
151 | if (req) {
|
152 | delete this._active[packet.header.id];
|
153 | this._active_count -= 1;
|
154 | req.handle(null, packet);
|
155 | this._fill();
|
156 | }
|
157 |
|
158 | this._unref();
|
159 | };
|
160 |
|
161 | SocketQueue.prototype._unref = function() {
|
162 | var self = this;
|
163 | this._refd = false;
|
164 |
|
165 | if (this._active_count <= 0) {
|
166 | if (this._socket.unref) {
|
167 | debug('unrefd socket');
|
168 | this._socket.unref();
|
169 | } else if (!this._timer) {
|
170 | this._timer = setTimeout(function() {
|
171 | self.close();
|
172 | }, 300);
|
173 | }
|
174 | }
|
175 | };
|
176 |
|
177 | SocketQueue.prototype._ref = function() {
|
178 | this._refd = true;
|
179 | if (this._socket.ref) {
|
180 | debug('refd socket');
|
181 | this._socket.ref();
|
182 | } else if (this._timer) {
|
183 | clearTimeout(this._timer);
|
184 | this._timer = null;
|
185 | }
|
186 | };
|
187 |
|
188 | SocketQueue.prototype._onlisten = function() {
|
189 | this._unref();
|
190 | this._listening = true;
|
191 | this._fill();
|
192 | };
|
193 |
|
194 | SocketQueue.prototype._onclose = function() {
|
195 | var req, err, self = this;
|
196 |
|
197 | debug('socket closed', this);
|
198 |
|
199 | this._listening = false;
|
200 |
|
201 | err = new Error('getHostByName ' + consts.TIMEOUT);
|
202 | err.errno = consts.TIMEOUT;
|
203 |
|
204 | while (this._pending.length) {
|
205 | req = this._pending.pop();
|
206 | req.error(err);
|
207 | }
|
208 |
|
209 | Object.keys(this._active).forEach(function(key) {
|
210 | var req = self._active[key];
|
211 | req.error(err);
|
212 | delete self._active[key];
|
213 | self._active_count -= 1;
|
214 | });
|
215 | };
|
216 |
|
217 | var serverHash = function(server) {
|
218 | if (server.type === 'tcp')
|
219 | return server.address + ':' + server.port;
|
220 | else
|
221 | return 'udp' + net.isIP(server.address);
|
222 | };
|
223 |
|
224 | var _sockets = {};
|
225 |
|
226 | exports.send = function(request) {
|
227 | var hash = serverHash(request.server);
|
228 | var socket = _sockets[hash];
|
229 |
|
230 | if (!socket) {
|
231 | switch (hash) {
|
232 | case 'udp4':
|
233 | case 'udp6':
|
234 | socket = new SocketQueue(new UDPSocket(), hash);
|
235 | break;
|
236 | default:
|
237 | socket = new SocketQueue(new TCPSocket(), request.server);
|
238 | break;
|
239 | }
|
240 |
|
241 | socket.on('close', function() {
|
242 | delete _sockets[hash];
|
243 | });
|
244 |
|
245 | _sockets[hash] = socket;
|
246 | }
|
247 |
|
248 | socket.send(request);
|
249 | };
|
250 |
|
251 | exports.remove = function(request) {
|
252 | var hash = serverHash(request.server);
|
253 | var socket = _sockets[hash];
|
254 | if (socket) {
|
255 | socket.remove(request);
|
256 | }
|
257 | };
|