UNPKG

6.26 kBJavaScriptView Raw
1// Copyright 2012 Timothy J Fontaine <tjfontaine@gmail.com>
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19// THE SOFTWARE
20
21'use strict';
22
23var net = require('net'),
24 util = require('util'),
25 EventEmitter = require('events').EventEmitter,
26 Packet = require('./packet'),
27 consts = require('./consts'),
28 UDPSocket = require('./utils').UDPSocket,
29 TCPSocket = require('./utils').TCPSocket;
30
31var debug = function() {
32 //var args = Array.prototype.slice.call(arguments);
33 //console.log.apply(this, ['pending', Date.now().toString()].concat(args));
34};
35
36var SocketQueue = function(socket, server) {
37 this._active = {};
38 this._active_count = 0;
39 this._pending = [];
40
41 debug('created', server);
42
43 this._server = server;
44
45 this._socket = socket;
46 this._socket.on('ready', this._onlisten.bind(this));
47 this._socket.on('message', this._onmessage.bind(this));
48 this._socket.on('close', this._onclose.bind(this));
49 this._socket.bind(server);
50
51 this._refd = true;
52};
53util.inherits(SocketQueue, EventEmitter);
54
55SocketQueue.prototype.send = function(request) {
56 debug('added', request.question);
57 this._pending.push(request);
58 this._fill();
59};
60
61SocketQueue.prototype.remove = function(request) {
62 var req = this._active[request.id];
63 var idx = this._pending.indexOf(request);
64
65 if (req) {
66 delete this._active[request.id];
67 this._active_count -= 1;
68 this._fill();
69 }
70
71 if (idx > -1)
72 this._pending.splice(idx, 1);
73
74 this._unref();
75};
76
77SocketQueue.prototype.close = function() {
78 debug('closing', this._server);
79 this._socket.close();
80 this._socket = undefined;
81 this.emit('close');
82};
83
84SocketQueue.prototype._fill = function() {
85 debug('pre fill, active:', this._active_count, 'pending:',
86 this._pending.length);
87
88 while (this._listening && this._pending.length &&
89 this._active_count < 100) {
90 this._dequeue();
91 }
92
93 debug('post fill, active:', this._active_count, 'pending:',
94 this._pending.length);
95};
96
97var random_integer = function() {
98 return Math.floor(Math.random() * 50000 + 1);
99};
100
101SocketQueue.prototype._dequeue = function() {
102 var req = this._pending.pop();
103 var id, packet, dnssocket;
104
105 if (req) {
106 id = random_integer();
107
108 while (this._active[id])
109 id = random_integer();
110
111 debug('sending', req.question, id);
112
113 req.id = id;
114 this._active[id] = req;
115 this._active_count += 1;
116
117 try {
118 packet = new Packet(this._socket.remote(req.server));
119 packet.header.id = id;
120 packet.header.rd = 1;
121
122 if (req.try_edns) {
123 packet.edns_version = 0;
124 //TODO when we support dnssec
125 //packet.do = 1
126 }
127
128 packet.question.push(req.question);
129 packet.send();
130
131 this._ref();
132 } catch (e) {
133 req.error(e);
134 }
135 }
136};
137
138SocketQueue.prototype._onmessage = function(msg, remote) {
139 var req;
140
141 debug('got a message', this._server);
142
143 try {
144 var packet = Packet.parse(msg, remote);
145 req = this._active[packet.header.id];
146 debug('associated message', packet.header.id);
147 } catch (e) {
148 debug('error parsing packet', e);
149 }
150
151 if (req) {
152 delete this._active[packet.header.id];
153 this._active_count -= 1;
154 req.handle(null, packet);
155 this._fill();
156 }
157
158 this._unref();
159};
160
161SocketQueue.prototype._unref = function() {
162 var self = this;
163 this._refd = false;
164
165 if (this._active_count <= 0) {
166 if (this._socket.unref) {
167 debug('unrefd socket');
168 this._socket.unref();
169 } else if (!this._timer) {
170 this._timer = setTimeout(function() {
171 self.close();
172 }, 300);
173 }
174 }
175};
176
177SocketQueue.prototype._ref = function() {
178 this._refd = true;
179 if (this._socket.ref) {
180 debug('refd socket');
181 this._socket.ref();
182 } else if (this._timer) {
183 clearTimeout(this._timer);
184 this._timer = null;
185 }
186};
187
188SocketQueue.prototype._onlisten = function() {
189 this._unref();
190 this._listening = true;
191 this._fill();
192};
193
194SocketQueue.prototype._onclose = function() {
195 var req, err, self = this;
196
197 debug('socket closed', this);
198
199 this._listening = false;
200
201 err = new Error('getHostByName ' + consts.TIMEOUT);
202 err.errno = consts.TIMEOUT;
203
204 while (this._pending.length) {
205 req = this._pending.pop();
206 req.error(err);
207 }
208
209 Object.keys(this._active).forEach(function(key) {
210 var req = self._active[key];
211 req.error(err);
212 delete self._active[key];
213 self._active_count -= 1;
214 });
215};
216
217var serverHash = function(server) {
218 if (server.type === 'tcp')
219 return server.address + ':' + server.port;
220 else
221 return 'udp' + net.isIP(server.address);
222};
223
224var _sockets = {};
225
226exports.send = function(request) {
227 var hash = serverHash(request.server);
228 var socket = _sockets[hash];
229
230 if (!socket) {
231 switch (hash) {
232 case 'udp4':
233 case 'udp6':
234 socket = new SocketQueue(new UDPSocket(), hash);
235 break;
236 default:
237 socket = new SocketQueue(new TCPSocket(), request.server);
238 break;
239 }
240
241 socket.on('close', function() {
242 delete _sockets[hash];
243 });
244
245 _sockets[hash] = socket;
246 }
247
248 socket.send(request);
249};
250
251exports.remove = function(request) {
252 var hash = serverHash(request.server);
253 var socket = _sockets[hash];
254 if (socket) {
255 socket.remove(request);
256 }
257};