1 | #!/usr/bin/env node
|
2 |
|
3 |
|
4 |
|
5 | 'use strict';
|
6 |
|
7 | var child_process = require('child_process');
|
8 | var fs = require('fs');
|
9 | var os = require('os');
|
10 | var cluster = require('cluster');
|
11 | var mkdirp = require('mkdirp');
|
12 | var uuid = require('node-uuid');
|
13 | var trace = require('line-trace');
|
14 |
|
15 | var SkaleClient = require('../lib/client.js');
|
16 | var Lines = require('../lib/lines.js');
|
17 | var sizeOf = require('../lib/rough-sizeof.js');
|
18 | var readSplit = require('../lib/readsplit.js').readSplit;
|
19 |
|
20 |
|
21 |
|
22 | var opt = require('node-getopt').create([
|
23 | ['h', 'help', 'print this help text'],
|
24 | ['d', 'debug', 'print debug traces'],
|
25 | ['m', 'memory=ARG', 'set max memory in MB for workers (default 1024)'],
|
26 | ['M', 'MyHost=ARG', 'advertised hostname'],
|
27 | ['n', 'nworker=ARG', 'number of workers (default: number of cpus)'],
|
28 | ['H', 'Host=ARG', 'server hostname (default localhost)'],
|
29 | ['P', 'Port=ARG', 'server port (default 12346)'],
|
30 | ['V', 'version', 'print version']
|
31 | ]).bindHelp().parseSystem();
|
32 |
|
33 | if (opt.options.version) {
|
34 | var pkg = require('../package');
|
35 | return console.log(pkg.name + '-' + pkg.version);
|
36 | }
|
37 |
|
38 | var debug = opt.options.debug || false;
|
39 | var ncpu = Number(opt.options.nworker) || (process.env.SKALE_WORKER_PER_HOST ? process.env.SKALE_WORKER_PER_HOST : os.cpus().length);
|
40 | var hostname = opt.options.MyHost || os.hostname();
|
41 | var memory = Number(opt.options.memory || 1024);
|
42 | var cgrid;
|
43 | var mm = new MemoryManager(memory);
|
44 | ncpu = Number(ncpu);
|
45 |
|
46 | if (cluster.isMaster) {
|
47 | process.title = 'skale-worker-controller';
|
48 | cluster.setupMaster({execArgv: ['--max_old_space_size=' + memory]});
|
49 | cluster.on('exit', handleExit);
|
50 | cgrid = new SkaleClient({
|
51 | debug: debug,
|
52 | host: opt.options.Host,
|
53 | port: opt.options.Port,
|
54 | data: {
|
55 | type: 'worker-controller',
|
56 | hostname: hostname,
|
57 | ncpu: ncpu
|
58 | }
|
59 | });
|
60 | cgrid.on('connect', startWorkers);
|
61 | cgrid.on('getWorker', startWorkers);
|
62 | cgrid.on('close', process.exit);
|
63 | console.log('worker controller ready');
|
64 | } else {
|
65 | runWorker(opt.options.Host, opt.options.Port);
|
66 | }
|
67 |
|
68 | function startWorkers(msg) {
|
69 | var worker = [], removed = {};
|
70 | var n = msg.n || ncpu;
|
71 | for (var i = 0; i < n; i++)
|
72 | worker[i] = cluster.fork({wsid: msg.wsid, rank: i});
|
73 | worker.forEach(function (w) {
|
74 | w.on('message', function (msg) {
|
75 | switch (msg.cmd) {
|
76 | case 'rm':
|
77 | if (msg.dir && !removed[msg.dir]) {
|
78 | removed[msg.dir] = true;
|
79 | trace('remove /tmp/skale/' + msg.dir);
|
80 | child_process.execFile('/bin/rm', ['-rf', '/tmp/skale/' + msg.dir]);
|
81 | }
|
82 | break;
|
83 | default:
|
84 | console.log('unexpected msg', msg);
|
85 | }
|
86 | });
|
87 | });
|
88 | }
|
89 |
|
90 | function handleExit(worker, code, signal) {
|
91 | console.log('worker pid', worker.process.pid, ', exited:', signal || code);
|
92 | }
|
93 |
|
94 | function runWorker(host, port) {
|
95 | var contextId;
|
96 |
|
97 | process.title = 'skale-worker_' + process.env.wsid + '_' + process.env.rank;
|
98 | process.on('uncaughtException', function (err) {
|
99 | grid.send(grid.muuid, {cmd: 'workerError', args: err.stack});
|
100 | process.exit(2);
|
101 | });
|
102 |
|
103 | var grid = new SkaleClient({
|
104 | debug: debug,
|
105 | host: host,
|
106 | port: port,
|
107 | data: {
|
108 | ncpu: os.cpus().length,
|
109 | os: os.type(),
|
110 | arch: os.arch(),
|
111 | usedmem: process.memoryUsage().rss,
|
112 | totalmem: os.totalmem(),
|
113 | hostname: hostname,
|
114 | type: 'worker',
|
115 | wsid: process.env.wsid,
|
116 | jobId: ''
|
117 | }
|
118 | }, function (err, res) {
|
119 | console.log('id: ', res.id, 'uuid: ', res.uuid);
|
120 | grid.host = {uuid: res.uuid, id: res.id};
|
121 | grid.workerHost = {};
|
122 | });
|
123 |
|
124 | grid.on('error', function (err) {
|
125 | console.log('grid error', err);
|
126 | process.exit(2);
|
127 | });
|
128 |
|
129 | var request = {
|
130 | runTask: function runTask(msg) {
|
131 | grid.muuid = msg.data.master_uuid;
|
132 | var task = parseTask(msg.data.args);
|
133 | contextId = task.contextId;
|
134 |
|
135 | task.mm = mm;
|
136 | task.lib = {sizeOf: sizeOf, fs: fs, readSplit: readSplit, Lines: Lines, task: task, mkdirp: mkdirp, uuid: uuid, trace: trace};
|
137 | task.grid = grid;
|
138 | task.run(function(result) {grid.reply(msg, null, result);});
|
139 | }
|
140 | };
|
141 |
|
142 | grid.on('remoteClose', function () {
|
143 | process.send({cmd: 'rm', dir: contextId});
|
144 | process.exit();
|
145 | });
|
146 |
|
147 | grid.on('request', function (msg) {
|
148 | if (msg.first) {
|
149 | for (var i = 0; i < msg.first.length; i++)
|
150 | grid.workerHost[i] = msg.first[i].hostname;
|
151 | }
|
152 | try {request[msg.data.cmd](msg);}
|
153 | catch (error) {
|
154 | console.error(error.stack);
|
155 | grid.reply(msg, error, null);
|
156 | }
|
157 | });
|
158 |
|
159 | grid.on('sendFile', function (msg) {
|
160 | fs.createReadStream(msg.path, msg.opt).pipe(grid.createStreamTo(msg));
|
161 | });
|
162 | }
|
163 |
|
164 | function MemoryManager(memory) {
|
165 | var Kb = 1024, Mb = 1024 * Kb;
|
166 | var MAX_MEMORY = (memory - 100) * Mb;
|
167 | var maxStorageMemory = MAX_MEMORY * 0.4;
|
168 | var maxShuffleMemory = MAX_MEMORY * 0.2;
|
169 | var maxCollectMemory = MAX_MEMORY * 0.2;
|
170 |
|
171 | this.storageMemory = 0;
|
172 | this.shuffleMemory = 0;
|
173 | this.collectMemory = 0;
|
174 | this.sizeOf = sizeOf;
|
175 |
|
176 | this.storageFull = function () {return (this.storageMemory > maxStorageMemory);};
|
177 | this.shuffleFull = function () {return (this.shuffleMemory > maxShuffleMemory);};
|
178 | this.collectFull = function () {return (this.collectMemory > maxCollectMemory);};
|
179 |
|
180 | this.partitions = {};
|
181 | this.register = function (partition) {
|
182 | var key = partition.datasetId + '.' + partition.partitionIndex;
|
183 | if (!(key in this.partitions)) this.partitions[key] = partition;
|
184 | };
|
185 |
|
186 | this.unregister = function (partition) {
|
187 | this.partitions[partition.datasetId + '.' + partition.partitionIndex] = undefined;
|
188 | };
|
189 |
|
190 | this.isAvailable = function (partition) {
|
191 | return (this.partitions[partition.datasetId + '.' + partition.partitionIndex] != undefined);
|
192 | };
|
193 | }
|
194 |
|
195 | function parseTask(str) {
|
196 | return JSON.parse(str, function(key, value) {
|
197 | if (typeof value == 'string') {
|
198 |
|
199 | if (value.substring(0, 8) == 'function') {
|
200 | var args = value.match(/\(([^)]*)/)[1];
|
201 | var body = value.replace(/^function\s*[^)]*\)\s*{/, '').replace(/}$/, '');
|
202 | value = new Function(args, body);
|
203 | } else if (value.match(/^\s*\(\s*[^(][^)]*\)\s*=>/) || value.match(/^\s*\w+\s*=>/))
|
204 | value = ('indirect', eval)(value);
|
205 | }
|
206 | return value;
|
207 | });
|
208 | }
|