1 | #!/usr/bin/env node
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | 'use strict';
|
12 |
|
13 | var child_process = require('child_process');
|
14 | var net = require('net');
|
15 | var util = require('util');
|
16 | var stream = require('stream');
|
17 | var trace = require('line-trace');
|
18 | var uuidGen = require('node-uuid');
|
19 | var SkaleClient = require('../lib/client.js');
|
20 | var webSocketServer = require('ws').Server;
|
21 | var websocket = require('websocket-stream');
|
22 |
|
23 | var wsid = 1;
|
24 | var expectedWorkers = 0;
|
25 | var workerStock = [];
|
26 | var workerControllers = [];
|
27 | var pendingMasters = [];
|
28 |
|
29 | var opt = require('node-getopt').create([
|
30 | ['h', 'help', 'print this help text'],
|
31 | ['H', 'Host=ARG', 'primary server host (default none)'],
|
32 | ['l', 'local=ARG', 'start local worker controller (default ncpu workers)'],
|
33 | ['m', 'memory=ARG', 'set max memory in MB for workers in local mode (default 1024)'],
|
34 | ['n', 'nworker=ARG', 'start local worker controller (default ncpu workers)'],
|
35 | ['N', 'Name=ARG', 'advertised server name (default localhost)'],
|
36 | ['P', 'Port=ARG', 'primary server port (default none)'],
|
37 | ['p', 'port=ARG', 'server port (default 12346)'],
|
38 | ['w', 'wsport=ARG', 'listen on websocket port (default none)'],
|
39 | ['V', 'version', 'print version']
|
40 | ]).bindHelp().parseSystem();
|
41 |
|
42 | if (opt.options.version) {
|
43 | var pkg = require('../package');
|
44 | return console.log(pkg.name + '-' + pkg.version);
|
45 | }
|
46 |
|
47 | var clients = {};
|
48 | var clientNum = 1;
|
49 | var clientMax = SkaleClient.minMulticast;
|
50 | var minMulticast = SkaleClient.minMulticast;
|
51 | var topics = {};
|
52 | var topicNum = -1;
|
53 | var UInt32Max = 4294967296;
|
54 | var topicMax = UInt32Max - minMulticast;
|
55 | var topicIndex = {};
|
56 | var memory = opt.options.memory || 1024;
|
57 |
|
58 | var port = Number(opt.options.port) || 12346;
|
59 | var wss;
|
60 | var wsport = opt.options.wsport || port + 2;
|
61 | var crossbar = {};
|
62 | var nworker = (opt.options.local > 0) ? opt.options.local : 0;
|
63 | var access = process.env.SKALE_KEY;
|
64 |
|
65 | process.title = 'skale-server ' + port;
|
66 |
|
67 | function SwitchBoard(sock) {
|
68 | if (!(this instanceof SwitchBoard))
|
69 | return new SwitchBoard(sock);
|
70 | stream.Transform.call(this, {objectMode: true});
|
71 | sock.index = getClientNumber();
|
72 | crossbar[sock.index] = sock;
|
73 | this.sock = sock;
|
74 | }
|
75 | util.inherits(SwitchBoard, stream.Transform);
|
76 |
|
77 | SwitchBoard.prototype._transform = function (chunk, encoding, done) {
|
78 | var o = {}, to = chunk.readUInt32LE(0, true);
|
79 | if (to >= minMulticast) {
|
80 | var sub = topics[to - minMulticast].sub, len = sub.length, n = 0;
|
81 | if (len === 0) return done();
|
82 | for (var i in sub) {
|
83 |
|
84 | if (crossbar[sub[i]]) {
|
85 | crossbar[sub[i]].write(chunk, function () {
|
86 | if (++n == len) done();
|
87 | });
|
88 | } else if (--len === 0) done();
|
89 | }
|
90 | } else if (to > 1) {
|
91 | if (crossbar[to])
|
92 | crossbar[to].write(chunk, done);
|
93 | else done();
|
94 | } else if (to === 1) {
|
95 | } else if (to === 0) {
|
96 | try {
|
97 | o = JSON.parse(chunk.slice(8));
|
98 | } catch (error) {
|
99 | console.error(error);
|
100 | return done();
|
101 | }
|
102 | if (!(o.cmd in clientRequest)) {
|
103 | o.error = 'Invalid command: ' + o.cmd;
|
104 | o.cmd = 'reply';
|
105 | this.sock.write(SkaleClient.encode(o), done);
|
106 | } else if (clientRequest[o.cmd](this.sock, o)) {
|
107 | o.cmd = 'reply';
|
108 | this.sock.write(SkaleClient.encode(o), done);
|
109 | } else done();
|
110 | }
|
111 | };
|
112 |
|
113 |
|
114 |
|
115 | var clientRequest = {
|
116 | connect: function (sock, msg) {
|
117 | var i, ret = true, master;
|
118 | if (access && msg.access != access) {
|
119 | console.log('## Skale connect failed: access denied');
|
120 | msg.error = 'access denied, check SKALE_KEY';
|
121 | return true;
|
122 | }
|
123 | register(null, msg, sock);
|
124 | if (msg.data.query) msg.data.devices = devices(msg);
|
125 | if (msg.data.notify in clients && clients[msg.data.notify].sock) {
|
126 | clients[msg.data.notify].sock.write(SkaleClient.encode({cmd: 'notify', data: msg.data}));
|
127 | clients[msg.data.notify].closeListeners[msg.data.uuid] = true;
|
128 | }
|
129 | switch (msg.data.type) {
|
130 | case 'worker-controller':
|
131 | msg.data.wsid = wsid;
|
132 | expectedWorkers += msg.data.ncpu;
|
133 | workerControllers.push(msg.data);
|
134 | break;
|
135 | case 'worker':
|
136 | if (wsid == msg.data.wsid) {
|
137 | workerStock.push(msg.data);
|
138 | if (pendingMasters.length && workerStock.length >= expectedWorkers) {
|
139 | master = pendingMasters.shift();
|
140 | master.data.devices = workerStock;
|
141 | master.cmd = 'reply';
|
142 | if (clients[master.data.uuid].sock)
|
143 | clients[master.data.uuid].sock.write(SkaleClient.encode(master));
|
144 | postMaster(master.data.uuid);
|
145 | }
|
146 | }
|
147 | break;
|
148 | case 'master':
|
149 | if (expectedWorkers && workerStock.length >= expectedWorkers) {
|
150 | msg.data.devices = workerStock;
|
151 | postMaster(msg.data.uuid);
|
152 | } else {
|
153 | pendingMasters.push(msg);
|
154 | ret = false;
|
155 | }
|
156 | break;
|
157 | }
|
158 | console.log('## Connect', msg.data.type, msg.data.id, msg.data.uuid);
|
159 | return ret;
|
160 |
|
161 | function postMaster(muuid) {
|
162 | var wuuid;
|
163 |
|
164 | for (i = 0; i < workerStock.length; i++) {
|
165 | wuuid = workerStock[i].uuid;
|
166 | clients[muuid].closeListeners[wuuid] = true;
|
167 | clients[wuuid].closeListeners[muuid] = true;
|
168 | }
|
169 |
|
170 | wsid++;
|
171 | workerStock = [];
|
172 | for (i = 0; i < workerControllers.length; i++) {
|
173 | clients[workerControllers[i].uuid].sock.write(SkaleClient.encode({
|
174 | cmd: 'getWorker',
|
175 | wsid: wsid,
|
176 | n: workerControllers[i].ncpu
|
177 | }));
|
178 | }
|
179 | }
|
180 | },
|
181 | devices: function (sock, msg) {
|
182 | msg.ufrom = sock.client.uuid;
|
183 | msg.data = devices(msg);
|
184 | return true;
|
185 | },
|
186 | end: function (sock) {
|
187 | sock.client.end = true;
|
188 | return false;
|
189 | },
|
190 | get: function (sock, msg) {
|
191 | msg.data = clients[msg.data] ? clients[msg.data].data : null;
|
192 | return true;
|
193 | },
|
194 | id: function (sock, msg) {
|
195 | msg.data = msg.data in clients ? clients[msg.data].index : null;
|
196 | return true;
|
197 | },
|
198 | notify: function (sock, msg) {
|
199 | if (clients[msg.data])
|
200 | clients[msg.data].closeListeners[sock.client.uuid] = true;
|
201 | return false;
|
202 | },
|
203 | set: function (sock, msg) {
|
204 | if (typeof msg.data != 'object') return false;
|
205 | for (var i in msg.data)
|
206 | sock.client.data[i] = msg.data[i];
|
207 | pubmon({event: 'set', uuid: sock.client.uuid, data: msg.data});
|
208 | return false;
|
209 | },
|
210 | subscribe: function (sock, msg) {
|
211 | subscribe(sock.client, msg.data);
|
212 | return false;
|
213 | },
|
214 | tid: function (sock, msg) {
|
215 | var topic = msg.data;
|
216 | var n = msg.data = getTopicId(topic);
|
217 |
|
218 | if (!topics[n].owner) {
|
219 | topics[n].owner = sock.client;
|
220 | sock.client.topics[n] = true;
|
221 | }
|
222 | return true;
|
223 | },
|
224 | unsubscribe: function (sock, msg) {
|
225 | unsubscribe(sock.client, msg.data);
|
226 | return false;
|
227 | }
|
228 | };
|
229 |
|
230 |
|
231 | var mstream = new SwitchBoard({});
|
232 | var monid = getTopicId('monitoring') + minMulticast;
|
233 | function pubmon(data) {
|
234 | mstream.write(SkaleClient.encode({cmd: 'monitoring', id: monid, data: data}));
|
235 | }
|
236 |
|
237 | process.on('uncaughtException', function uncaughtException(err) {
|
238 | trace(err);
|
239 | console.error(err.stack);
|
240 | });
|
241 |
|
242 | process.on('SIGTERM', function sigterm() {
|
243 | trace('terminated, exit');
|
244 | process.exit();
|
245 | });
|
246 |
|
247 | console.log('## Started', Date());
|
248 |
|
249 | if (port) {
|
250 | net.createServer(handleConnect).listen(port);
|
251 | console.log('## Listening TCP on', port);
|
252 | }
|
253 |
|
254 |
|
255 | if (wsport) {
|
256 | console.log('## Listening WebSocket on', wsport);
|
257 | wss = new webSocketServer({port: wsport});
|
258 | wss.on('connection', function (ws) {
|
259 | var sock = websocket(ws);
|
260 | sock.ws = true;
|
261 | handleConnect(sock);
|
262 |
|
263 | ws.on('close', function () {
|
264 | handleClose(sock);
|
265 | });
|
266 | ws.on('error', function (error) {
|
267 | console.log('## websocket connection error', error.stack);
|
268 | handleClose(sock);
|
269 | });
|
270 | });
|
271 | }
|
272 |
|
273 |
|
274 | if (opt.options.local) startWorker();
|
275 |
|
276 | function startWorker() {
|
277 | var worker = child_process.spawn(
|
278 | __dirname + '/worker.js',
|
279 | ['-P', port, '-n', nworker, '-m', memory],
|
280 | {stdio: 'inherit'}
|
281 | );
|
282 | worker.on('close', startWorker);
|
283 | }
|
284 |
|
285 | function handleClose(sock) {
|
286 | var i, cli = sock.client;
|
287 | if (cli) {
|
288 | console.log('## Close:', cli.data.type, cli.index, cli.uuid);
|
289 | pubmon({event: 'disconnect', uuid: cli.uuid});
|
290 | cli.sock = null;
|
291 | switch (cli.data.type) {
|
292 | case 'worker-controller':
|
293 |
|
294 | expectedWorkers -= cli.data.ncpu;
|
295 | for (i = 0; i < workerControllers.length; i++) {
|
296 | if (cli.uuid == workerControllers[i].uuid) {
|
297 | workerControllers.splice(i, 1);
|
298 | }
|
299 | }
|
300 | break;
|
301 | case 'worker':
|
302 |
|
303 | for (i = 0; i < workerStock.length; i++) {
|
304 | if (cli.uuid == workerStock[i].uuid)
|
305 | workerStock.splice(i, 1);
|
306 | }
|
307 | break;
|
308 | case 'master':
|
309 |
|
310 | for (i in pendingMasters) {
|
311 | if (pendingMasters[i].data.uuid == cli.uuid) {
|
312 | pendingMasters.splice(i, 1);
|
313 | break;
|
314 | }
|
315 | }
|
316 | break;
|
317 | }
|
318 | for (i in cli.closeListeners) {
|
319 | if (i in clients && clients[i].sock)
|
320 | clients[i].sock.write(SkaleClient.encode({cmd: 'remoteClose', data: cli.uuid}));
|
321 | }
|
322 | for (i in cli.topics) {
|
323 | delete topicIndex[topics[i].name];
|
324 | delete topics[i];
|
325 | }
|
326 | if (cli.end) delete clients[cli.uuid];
|
327 | } else {
|
328 | console.log('## Close:', sock._peername);
|
329 | }
|
330 | if (sock.index) delete crossbar[sock.index];
|
331 | sock.removeAllListeners();
|
332 | }
|
333 |
|
334 | function handleConnect(sock) {
|
335 | if (sock.ws) {
|
336 | console.log('## Connect websocket from', sock.socket.upgradeReq.headers.origin);
|
337 | } else {
|
338 | console.log('## Connect tcp', sock.remoteAddress, sock.remotePort);
|
339 | sock.setNoDelay();
|
340 | }
|
341 | sock.on('end', function () {
|
342 | handleClose(sock);
|
343 | });
|
344 | sock.on('error', function sockError(error) {
|
345 | console.log('## connection error', error.stack);
|
346 | handleClose(sock);
|
347 | });
|
348 | sock.pipe(new SkaleClient.FromGrid()).pipe(new SwitchBoard(sock));
|
349 | }
|
350 |
|
351 | function getClientNumber() {
|
352 | var n = 100000;
|
353 | do {
|
354 | clientNum = (clientNum < clientMax) ? clientNum + 1 : 2;
|
355 | } while (clientNum in crossbar && --n);
|
356 | if (!n) throw new Error('getClientNumber failed');
|
357 | return clientNum;
|
358 | }
|
359 |
|
360 | function register(from, msg, sock)
|
361 | {
|
362 | var uuid = msg.uuid || uuidGen.v1();
|
363 | sock.client = clients[uuid] = {
|
364 | index: sock.index,
|
365 | uuid: uuid,
|
366 | owner: from ? from : uuid,
|
367 | data: msg.data || {},
|
368 | sock: sock,
|
369 | topics: {},
|
370 | closeListeners: {}
|
371 | };
|
372 | pubmon({event: 'connect', uuid: uuid, data: msg.data});
|
373 |
|
374 | if (!msg.data) msg.data = {};
|
375 | msg.data.uuid = uuid;
|
376 | msg.data.token = 0;
|
377 | msg.data.id = sock.index;
|
378 | }
|
379 |
|
380 | function devices(msg) {
|
381 | var query = msg.data.query, result = [];
|
382 |
|
383 | for (var i in clients) {
|
384 | if (!clients[i].sock) continue;
|
385 | var match = true;
|
386 | for (var j in query) {
|
387 | if (!clients[i].data || clients[i].data[j] != query[j]) {
|
388 | match = false;
|
389 | break;
|
390 | }
|
391 | }
|
392 | if (match) {
|
393 | result.push({
|
394 | uuid: i,
|
395 | id: clients[i].index,
|
396 | ip: clients[i].sock.remoteAddress,
|
397 | data: clients[i].data
|
398 | });
|
399 | }
|
400 | }
|
401 | return result;
|
402 | }
|
403 |
|
404 | function getTopicId(topic) {
|
405 | if (topic in topicIndex) return topicIndex[topic];
|
406 | var n = 10000;
|
407 | do {
|
408 | topicNum = (topicNum < topicMax) ? topicNum + 1 : 0;
|
409 | } while (topicNum in topics && --n);
|
410 | if (!n) throw new Error('getTopicId failed');
|
411 | topics[topicNum] = {name: topic, id: topicNum, sub: []};
|
412 | topicIndex[topic] = topicNum;
|
413 | return topicIndex[topic];
|
414 | }
|
415 |
|
416 | function subscribe(client, topic) {
|
417 | var sub = topics[getTopicId(topic)].sub;
|
418 | if (sub.indexOf(client.index) < 0)
|
419 | sub.push(client.index);
|
420 | }
|
421 |
|
422 | function unsubscribe(client, topic) {
|
423 | if (!(topic in topicIndex)) return;
|
424 | var sub = topics[topicIndex[topic]].sub, i = sub.indexOf(client.index);
|
425 | if (i >= 0) sub.splice(i, 1);
|
426 | }
|