UNPKG

3.23 kBJavaScriptView Raw
1"use strict";
2
3var
4 SocketCache = require('./socket_cache'),
5 net = require('net'),
6 util = require('util');
7
8var random_integer = function () {
9 return Math.floor(Math.random() * 50000 + 1);
10};
11
12var SOCKET_TIMEOUT = 300;
13
14var ServerQueue = module.exports = function (parent, active) {
15 var self = this;
16
17 this._queue = {};
18 this._active = {};
19 this._socketCache = new SocketCache(parent);
20 this._max_queue = active;
21
22 var check_sockets = function () {
23 var s, now;
24 now = new Date().getTime();
25 Object.keys(self._socketCache._socket).forEach(function (s) {
26 var socket = self._socketCache._socket[s];
27 var delta = now - socket.last;
28
29 var m = { server: s, delta: delta };
30
31 if (self._queue[s])
32 m.queue = self._queue[s].order.length;
33
34 if (self._active[s])
35 m.active = self._active[s].count;
36
37 if (delta > SOCKET_TIMEOUT && self._queue[s].order.length === 0 && self._active[s].count === 0) {
38 self._socketCache.close(s);
39 }
40 });
41 if (Object.keys(self._socketCache._socket).length) {
42 self._timer = setTimeout(check_sockets, SOCKET_TIMEOUT);
43 }
44 };
45
46 self._timer = setTimeout(check_sockets, SOCKET_TIMEOUT);
47};
48
49ServerQueue.prototype._hash = function (server) {
50 if (server.type === 'tcp')
51 return server.address + ':' + server.port;
52 else
53 return 'udp' + net.isIP(server.address);
54};
55
56ServerQueue.prototype._getQueue = function (server) {
57 var name = this._hash(server);
58
59 if (!this._queue[name]) {
60 this._queue[name] = {
61 order: [],
62 };
63 }
64
65 return this._queue[name];
66};
67
68ServerQueue.prototype._getActive = function (server) {
69 var name = this._hash(server);
70
71 if (!this._active[name]) {
72 this._active[name] = {
73 count: 0,
74 };
75 }
76
77 return this._active[name];
78};
79
80ServerQueue.prototype.add = function (server, request, cb) {
81 var name, id, queue, active;
82
83 name = this._hash(server);
84 queue = this._getQueue(server);
85 active = this._getActive(server);
86
87 id = random_integer();
88 while (queue[id] || active[id]) id = random_integer();
89
90 queue[id] = {
91 request: request,
92 cb: cb,
93 };
94 queue.order.splice(0, 0, id);
95 request.id = id;
96 this.fill(server);
97};
98
99ServerQueue.prototype.remove = function (server, id) {
100 var idx, queue, active;
101
102 queue = this._getQueue(server);
103 active = this._getActive(server);
104
105 delete queue[id];
106 idx = queue.order.indexOf(id);
107 if (idx > -1)
108 queue.order.splice(idx, 1);
109
110 if (active[id]) {
111 delete active[id];
112 active.count -= 1;
113 }
114
115 this.fill(server);
116};
117
118ServerQueue.prototype.pop = function (server) {
119 var queue, active, id, obj;
120 queue = this._getQueue(server);
121 active = this._getActive(server);
122
123 id = queue.order.pop();
124 obj = queue[id];
125
126 if (id && obj) {
127 active[id] = obj.request;
128 active.count += 1;
129 return obj.cb;
130 }
131};
132
133ServerQueue.prototype.fill = function (server) {
134 var active, cb;
135 active = this._getActive(server);
136 while (active.count < this._max_queue) {
137 cb = this.pop(server);
138 if (cb)
139 this._socketCache.get(server, cb);
140 else
141 break;
142 };
143};
144
145ServerQueue.prototype.getRequest = function (server, id) {
146 var active = this._getActive(server);
147 return active[id];
148};