UNPKG

6.09 kBJavaScriptView Raw
1#!/usr/bin/env node
2
3// Copyright 2016 Luca-SAS, licensed under the Apache License 2.0
4
5'use strict';
6
7var child_process = require('child_process');
8var fs = require('fs');
9var os = require('os');
10var cluster = require('cluster');
11var mkdirp = require('mkdirp');
12var uuid = require('node-uuid');
13var trace = require('line-trace');
14
15var SkaleClient = require('../lib/client.js');
16var Lines = require('../lib/lines.js');
17var sizeOf = require('../lib/rough-sizeof.js');
18var readSplit = require('../lib/readsplit.js').readSplit;
19
20//var global = {require: require};
21
22var opt = require('node-getopt').create([
23 ['h', 'help', 'print this help text'],
24 ['d', 'debug', 'print debug traces'],
25 ['m', 'memory=ARG', 'set max memory in MB for workers (default 1024)'],
26 ['M', 'MyHost=ARG', 'advertised hostname'],
27 ['n', 'nworker=ARG', 'number of workers (default: number of cpus)'],
28 ['H', 'Host=ARG', 'server hostname (default localhost)'],
29 ['P', 'Port=ARG', 'server port (default 12346)'],
30 ['V', 'version', 'print version']
31]).bindHelp().parseSystem();
32
33if (opt.options.version) {
34 var pkg = require('../package');
35 return console.log(pkg.name + '-' + pkg.version);
36}
37
38var debug = opt.options.debug || false;
39var ncpu = Number(opt.options.nworker) || (process.env.SKALE_WORKER_PER_HOST ? process.env.SKALE_WORKER_PER_HOST : os.cpus().length);
40var hostname = opt.options.MyHost || os.hostname();
41var memory = Number(opt.options.memory || 1024);
42var cgrid;
43var mm = new MemoryManager(memory);
44ncpu = Number(ncpu);
45
46if (cluster.isMaster) {
47 process.title = 'skale-worker-controller';
48 cluster.setupMaster({execArgv: ['--max_old_space_size=' + memory]});
49 cluster.on('exit', handleExit);
50 cgrid = new SkaleClient({
51 debug: debug,
52 host: opt.options.Host,
53 port: opt.options.Port,
54 data: {
55 type: 'worker-controller',
56 hostname: hostname,
57 ncpu: ncpu
58 }
59 });
60 cgrid.on('connect', startWorkers);
61 cgrid.on('getWorker', startWorkers);
62 cgrid.on('close', process.exit);
63 console.log('worker controller ready');
64} else {
65 runWorker(opt.options.Host, opt.options.Port);
66}
67
68function startWorkers(msg) {
69 var worker = [], removed = {};
70 var n = msg.n || ncpu;
71 for (var i = 0; i < n; i++)
72 worker[i] = cluster.fork({wsid: msg.wsid, rank: i});
73 worker.forEach(function (w) {
74 w.on('message', function (msg) {
75 switch (msg.cmd) {
76 case 'rm':
77 if (msg.dir && !removed[msg.dir]) {
78 removed[msg.dir] = true;
79 trace('remove /tmp/skale/' + msg.dir);
80 child_process.execFile('/bin/rm', ['-rf', '/tmp/skale/' + msg.dir]);
81 }
82 break;
83 default:
84 console.log('unexpected msg', msg);
85 }
86 });
87 });
88}
89
90function handleExit(worker, code, signal) {
91 console.log('worker pid', worker.process.pid, ', exited:', signal || code);
92}
93
94function runWorker(host, port) {
95 var contextId;
96
97 process.title = 'skale-worker_' + process.env.wsid + '_' + process.env.rank;
98 process.on('uncaughtException', function (err) {
99 grid.send(grid.muuid, {cmd: 'workerError', args: err.stack});
100 process.exit(2);
101 });
102
103 var grid = new SkaleClient({
104 debug: debug,
105 host: host,
106 port: port,
107 data: {
108 ncpu: os.cpus().length,
109 os: os.type(),
110 arch: os.arch(),
111 usedmem: process.memoryUsage().rss,
112 totalmem: os.totalmem(),
113 hostname: hostname,
114 type: 'worker',
115 wsid: process.env.wsid,
116 jobId: ''
117 }
118 }, function (err, res) {
119 console.log('id: ', res.id, 'uuid: ', res.uuid);
120 grid.host = {uuid: res.uuid, id: res.id};
121 grid.workerHost = {};
122 });
123
124 grid.on('error', function (err) {
125 console.log('grid error', err);
126 process.exit(2);
127 });
128
129 var request = {
130 runTask: function runTask(msg) {
131 grid.muuid = msg.data.master_uuid;
132 var task = parseTask(msg.data.args);
133 contextId = task.contextId;
134 // set worker side dependencies
135 task.mm = mm;
136 task.lib = {sizeOf: sizeOf, fs: fs, readSplit: readSplit, Lines: Lines, task: task, mkdirp: mkdirp, uuid: uuid, trace: trace};
137 task.grid = grid;
138 task.run(function(result) {grid.reply(msg, null, result);});
139 }
140 };
141
142 grid.on('remoteClose', function () {
143 process.send({cmd: 'rm', dir: contextId});
144 process.exit();
145 });
146
147 grid.on('request', function (msg) {
148 if (msg.first) {
149 for (var i = 0; i < msg.first.length; i++)
150 grid.workerHost[i] = msg.first[i].hostname;
151 }
152 try {request[msg.data.cmd](msg);}
153 catch (error) {
154 console.error(error.stack);
155 grid.reply(msg, error, null);
156 }
157 });
158
159 grid.on('sendFile', function (msg) {
160 fs.createReadStream(msg.path, msg.opt).pipe(grid.createStreamTo(msg));
161 });
162}
163
164function MemoryManager(memory) {
165 var Kb = 1024, Mb = 1024 * Kb;
166 var MAX_MEMORY = (memory - 100) * Mb;
167 var maxStorageMemory = MAX_MEMORY * 0.4;
168 var maxShuffleMemory = MAX_MEMORY * 0.2;
169 var maxCollectMemory = MAX_MEMORY * 0.2;
170
171 this.storageMemory = 0;
172 this.shuffleMemory = 0;
173 this.collectMemory = 0;
174 this.sizeOf = sizeOf;
175
176 this.storageFull = function () {return (this.storageMemory > maxStorageMemory);};
177 this.shuffleFull = function () {return (this.shuffleMemory > maxShuffleMemory);};
178 this.collectFull = function () {return (this.collectMemory > maxCollectMemory);};
179
180 this.partitions = {};
181 this.register = function (partition) {
182 var key = partition.datasetId + '.' + partition.partitionIndex;
183 if (!(key in this.partitions)) this.partitions[key] = partition;
184 };
185
186 this.unregister = function (partition) {
187 this.partitions[partition.datasetId + '.' + partition.partitionIndex] = undefined;
188 };
189
190 this.isAvailable = function (partition) {
191 return (this.partitions[partition.datasetId + '.' + partition.partitionIndex] != undefined);
192 };
193}
194
195function parseTask(str) {
196 return JSON.parse(str, function(key, value) {
197 if (typeof value == 'string') {
198 // String value can be a regular function or an ES6 arrow function
199 if (value.substring(0, 8) == 'function') {
200 var args = value.match(/\(([^)]*)/)[1];
201 var body = value.replace(/^function\s*[^)]*\)\s*{/, '').replace(/}$/, '');
202 value = new Function(args, body);
203 } else if (value.match(/^\s*\(\s*[^(][^)]*\)\s*=>/) || value.match(/^\s*\w+\s*=>/))
204 value = ('indirect', eval)(value);
205 }
206 return value;
207 });
208}