UNPKG

12.1 kBJavaScriptView Raw
1#!/usr/bin/env node
2
3// Copyright 2016 Luca-SAS, licensed under the Apache License 2.0
4
5// Todo:
6// - record/replay input messages
7// - handle foreign messages
8// - statistics in monitoring
9// - topics permissions (who can publish / subscribe)
10
11'use strict';
12
13var child_process = require('child_process');
14var net = require('net');
15var util = require('util');
16var stream = require('stream');
17var trace = require('line-trace');
18var uuidGen = require('node-uuid');
19var SkaleClient = require('../lib/client.js');
20var webSocketServer = require('ws').Server;
21var websocket = require('websocket-stream');
22
23var wsid = 1; // worker stock id
24var expectedWorkers = 0; // number of expected workers per stock
25var workerStock = [];
26var workerControllers = [];
27var pendingMasters = [];
28
29var opt = require('node-getopt').create([
30 ['h', 'help', 'print this help text'],
31 ['H', 'Host=ARG', 'primary server host (default none)'],
32 ['l', 'local=ARG', 'start local worker controller (default ncpu workers)'],
33 ['m', 'memory=ARG', 'set max memory in MB for workers in local mode (default 1024)'],
34 ['n', 'nworker=ARG', 'start local worker controller (default ncpu workers)'],
35 ['N', 'Name=ARG', 'advertised server name (default localhost)'],
36 ['P', 'Port=ARG', 'primary server port (default none)'],
37 ['p', 'port=ARG', 'server port (default 12346)'],
38 ['w', 'wsport=ARG', 'listen on websocket port (default none)'],
39 ['V', 'version', 'print version']
40]).bindHelp().parseSystem();
41
42if (opt.options.version) {
43 var pkg = require('../package');
44 return console.log(pkg.name + '-' + pkg.version);
45}
46
47var clients = {};
48var clientNum = 1;
49var clientMax = SkaleClient.minMulticast;
50var minMulticast = SkaleClient.minMulticast;
51var topics = {};
52var topicNum = -1;
53var UInt32Max = 4294967296;
54var topicMax = UInt32Max - minMulticast;
55var topicIndex = {};
56var memory = opt.options.memory || 1024;
57//var name = opt.options.name || 'localhost'; // Unused until FT comes back
58var port = Number(opt.options.port) || 12346;
59var wss;
60var wsport = opt.options.wsport || port + 2;
61var crossbar = {};
62var nworker = (opt.options.local > 0) ? opt.options.local : 0;
63var access = process.env.SKALE_KEY;
64
65process.title = 'skale-server ' + port;
66
67function SwitchBoard(sock) {
68 if (!(this instanceof SwitchBoard))
69 return new SwitchBoard(sock);
70 stream.Transform.call(this, {objectMode: true});
71 sock.index = getClientNumber();
72 crossbar[sock.index] = sock;
73 this.sock = sock;
74}
75util.inherits(SwitchBoard, stream.Transform);
76
77SwitchBoard.prototype._transform = function (chunk, encoding, done) {
78 var o = {}, to = chunk.readUInt32LE(0, true);
79 if (to >= minMulticast) { // Multicast
80 var sub = topics[to - minMulticast].sub, len = sub.length, n = 0;
81 if (len === 0) return done();
82 for (var i in sub) {
83 // Flow control: adjust to the slowest receiver
84 if (crossbar[sub[i]]) {
85 crossbar[sub[i]].write(chunk, function () {
86 if (++n == len) done();
87 });
88 } else if (--len === 0) done();
89 }
90 } else if (to > 1) { // Unicast
91 if (crossbar[to])
92 crossbar[to].write(chunk, done);
93 else done();
94 } else if (to === 1) { // Foreign (to be done)
95 } else if (to === 0) { // Server request
96 try {
97 o = JSON.parse(chunk.slice(8));
98 } catch (error) {
99 console.error(error);
100 return done();
101 }
102 if (!(o.cmd in clientRequest)) {
103 o.error = 'Invalid command: ' + o.cmd;
104 o.cmd = 'reply';
105 this.sock.write(SkaleClient.encode(o), done);
106 } else if (clientRequest[o.cmd](this.sock, o)) {
107 o.cmd = 'reply';
108 this.sock.write(SkaleClient.encode(o), done);
109 } else done();
110 }
111};
112
113// Client requests functions, return true if a response must be sent
114// to client, false otherwise. Reply data, if any, must be set in msg.data.
115var clientRequest = {
116 connect: function (sock, msg) {
117 var i, ret = true, master;
118 if (access && msg.access != access) {
119 console.log('## Skale connect failed: access denied');
120 msg.error = 'access denied, check SKALE_KEY';
121 return true;
122 }
123 register(null, msg, sock);
124 if (msg.data.query) msg.data.devices = devices(msg);
125 if (msg.data.notify in clients && clients[msg.data.notify].sock) {
126 clients[msg.data.notify].sock.write(SkaleClient.encode({cmd: 'notify', data: msg.data}));
127 clients[msg.data.notify].closeListeners[msg.data.uuid] = true;
128 }
129 switch (msg.data.type) {
130 case 'worker-controller':
131 msg.data.wsid = wsid;
132 expectedWorkers += msg.data.ncpu;
133 workerControllers.push(msg.data);
134 break;
135 case 'worker':
136 if (wsid == msg.data.wsid) {
137 workerStock.push(msg.data);
138 if (pendingMasters.length && workerStock.length >= expectedWorkers) {
139 master = pendingMasters.shift();
140 master.data.devices = workerStock;
141 master.cmd = 'reply';
142 if (clients[master.data.uuid].sock)
143 clients[master.data.uuid].sock.write(SkaleClient.encode(master));
144 postMaster(master.data.uuid);
145 }
146 }
147 break;
148 case 'master':
149 if (expectedWorkers && workerStock.length >= expectedWorkers) {
150 msg.data.devices = workerStock;
151 postMaster(msg.data.uuid);
152 } else {
153 pendingMasters.push(msg);
154 ret = false;
155 }
156 break;
157 }
158 console.log('## Connect', msg.data.type, msg.data.id, msg.data.uuid);
159 return ret;
160
161 function postMaster(muuid) {
162 var wuuid;
163 // Setup notifications to terminate workers on master end
164 for (i = 0; i < workerStock.length; i++) {
165 wuuid = workerStock[i].uuid;
166 clients[muuid].closeListeners[wuuid] = true;
167 clients[wuuid].closeListeners[muuid] = true;
168 }
169 // Pre-fork new workers to renew the stock
170 wsid++;
171 workerStock = [];
172 for (i = 0; i < workerControllers.length; i++) {
173 clients[workerControllers[i].uuid].sock.write(SkaleClient.encode({
174 cmd: 'getWorker',
175 wsid: wsid,
176 n: workerControllers[i].ncpu
177 }));
178 }
179 }
180 },
181 devices: function (sock, msg) {
182 msg.ufrom = sock.client.uuid;
183 msg.data = devices(msg);
184 return true;
185 },
186 end: function (sock) {
187 sock.client.end = true;
188 return false;
189 },
190 get: function (sock, msg) {
191 msg.data = clients[msg.data] ? clients[msg.data].data : null;
192 return true;
193 },
194 id: function (sock, msg) {
195 msg.data = msg.data in clients ? clients[msg.data].index : null;
196 return true;
197 },
198 notify: function (sock, msg) {
199 if (clients[msg.data])
200 clients[msg.data].closeListeners[sock.client.uuid] = true;
201 return false;
202 },
203 set: function (sock, msg) {
204 if (typeof msg.data != 'object') return false;
205 for (var i in msg.data)
206 sock.client.data[i] = msg.data[i];
207 pubmon({event: 'set', uuid: sock.client.uuid, data: msg.data});
208 return false;
209 },
210 subscribe: function (sock, msg) {
211 subscribe(sock.client, msg.data);
212 return false;
213 },
214 tid: function (sock, msg) {
215 var topic = msg.data;
216 var n = msg.data = getTopicId(topic);
217 // First to publish becomes topic owner
218 if (!topics[n].owner) {
219 topics[n].owner = sock.client;
220 sock.client.topics[n] = true;
221 }
222 return true;
223 },
224 unsubscribe: function (sock, msg) {
225 unsubscribe(sock.client, msg.data);
226 return false;
227 }
228};
229
230// Create a source stream and topic for monitoring info publishing
231var mstream = new SwitchBoard({});
232var monid = getTopicId('monitoring') + minMulticast;
233function pubmon(data) {
234 mstream.write(SkaleClient.encode({cmd: 'monitoring', id: monid, data: data}));
235}
236
237process.on('uncaughtException', function uncaughtException(err) {
238 trace(err);
239 console.error(err.stack);
240});
241
242process.on('SIGTERM', function sigterm() {
243 trace('terminated, exit');
244 process.exit();
245});
246
247console.log('## Started', Date());
248// Start a TCP server
249if (port) {
250 net.createServer(handleConnect).listen(port);
251 console.log('## Listening TCP on', port);
252}
253
254// Start a websocket server if a listening port is specified on command line
255if (wsport) {
256 console.log('## Listening WebSocket on', wsport);
257 wss = new webSocketServer({port: wsport});
258 wss.on('connection', function (ws) {
259 var sock = websocket(ws);
260 sock.ws = true;
261 handleConnect(sock);
262 // Catch error/close at websocket level in addition to stream level
263 ws.on('close', function () {
264 handleClose(sock);
265 });
266 ws.on('error', function (error) {
267 console.log('## websocket connection error', error.stack);
268 handleClose(sock);
269 });
270 });
271}
272
273// Start local workers if required
274if (opt.options.local) startWorker();
275
276function startWorker() {
277 var worker = child_process.spawn(
278 __dirname + '/worker.js',
279 ['-P', port, '-n', nworker, '-m', memory],
280 {stdio: 'inherit'}
281 );
282 worker.on('close', startWorker);
283}
284
285function handleClose(sock) {
286 var i, cli = sock.client;
287 if (cli) {
288 console.log('## Close:', cli.data.type, cli.index, cli.uuid);
289 pubmon({event: 'disconnect', uuid: cli.uuid});
290 cli.sock = null;
291 switch (cli.data.type) {
292 case 'worker-controller':
293 // Resize stock capacity
294 expectedWorkers -= cli.data.ncpu;
295 for (i = 0; i < workerControllers.length; i++) {
296 if (cli.uuid == workerControllers[i].uuid) {
297 workerControllers.splice(i, 1);
298 }
299 }
300 break;
301 case 'worker':
302 // Remove worker from stock
303 for (i = 0; i < workerStock.length; i++) {
304 if (cli.uuid == workerStock[i].uuid)
305 workerStock.splice(i, 1);
306 }
307 break;
308 case 'master':
309 // Remove master from pending masters, avoiding future useless workers start
310 for (i in pendingMasters) {
311 if (pendingMasters[i].data.uuid == cli.uuid) {
312 pendingMasters.splice(i, 1);
313 break;
314 }
315 }
316 break;
317 }
318 for (i in cli.closeListeners) {
319 if (i in clients && clients[i].sock)
320 clients[i].sock.write(SkaleClient.encode({cmd: 'remoteClose', data: cli.uuid}));
321 }
322 for (i in cli.topics) { // Remove owned topics
323 delete topicIndex[topics[i].name];
324 delete topics[i];
325 }
326 if (cli.end) delete clients[cli.uuid];
327 } else {
328 console.log('## Close:', sock._peername);
329 }
330 if (sock.index) delete crossbar[sock.index];
331 sock.removeAllListeners();
332}
333
334function handleConnect(sock) {
335 if (sock.ws) {
336 console.log('## Connect websocket from', sock.socket.upgradeReq.headers.origin);
337 } else {
338 console.log('## Connect tcp', sock.remoteAddress, sock.remotePort);
339 sock.setNoDelay();
340 }
341 sock.on('end', function () {
342 handleClose(sock);
343 });
344 sock.on('error', function sockError(error) {
345 console.log('## connection error', error.stack);
346 handleClose(sock);
347 });
348 sock.pipe(new SkaleClient.FromGrid()).pipe(new SwitchBoard(sock));
349}
350
351function getClientNumber() {
352 var n = 100000;
353 do {
354 clientNum = (clientNum < clientMax) ? clientNum + 1 : 2;
355 } while (clientNum in crossbar && --n);
356 if (!n) throw new Error('getClientNumber failed');
357 return clientNum;
358}
359
360function register(from, msg, sock)
361{
362 var uuid = msg.uuid || uuidGen.v1();
363 sock.client = clients[uuid] = {
364 index: sock.index,
365 uuid: uuid,
366 owner: from ? from : uuid,
367 data: msg.data || {},
368 sock: sock,
369 topics: {},
370 closeListeners: {}
371 };
372 pubmon({event: 'connect', uuid: uuid, data: msg.data});
373 //msg.data = {uuid: uuid, token: 0, id: sock.index};
374 if (!msg.data) msg.data = {};
375 msg.data.uuid = uuid;
376 msg.data.token = 0;
377 msg.data.id = sock.index;
378}
379
380function devices(msg) {
381 var query = msg.data.query, result = [];
382
383 for (var i in clients) {
384 if (!clients[i].sock) continue;
385 var match = true;
386 for (var j in query) {
387 if (!clients[i].data || clients[i].data[j] != query[j]) {
388 match = false;
389 break;
390 }
391 }
392 if (match) {
393 result.push({
394 uuid: i,
395 id: clients[i].index,
396 ip: clients[i].sock.remoteAddress,
397 data: clients[i].data
398 });
399 }
400 }
401 return result;
402}
403
404function getTopicId(topic) {
405 if (topic in topicIndex) return topicIndex[topic];
406 var n = 10000;
407 do {
408 topicNum = (topicNum < topicMax) ? topicNum + 1 : 0;
409 } while (topicNum in topics && --n);
410 if (!n) throw new Error('getTopicId failed');
411 topics[topicNum] = {name: topic, id: topicNum, sub: []};
412 topicIndex[topic] = topicNum;
413 return topicIndex[topic];
414}
415
416function subscribe(client, topic) {
417 var sub = topics[getTopicId(topic)].sub;
418 if (sub.indexOf(client.index) < 0)
419 sub.push(client.index);
420}
421
422function unsubscribe(client, topic) {
423 if (!(topic in topicIndex)) return;
424 var sub = topics[topicIndex[topic]].sub, i = sub.indexOf(client.index);
425 if (i >= 0) sub.splice(i, 1);
426}