UNPKG

11.6 kBJavaScriptView Raw
1// Copyright 2016 Luca-SAS, licensed under the Apache License 2.0
2
3'use strict';
4
5var fs = require('fs');
6var util = require('util');
7var url = require('url');
8var zlib = require('zlib');
9var uuid = require('uuid');
10var mkdirp = require('mkdirp');
11
12var SkaleClient = require('./client.js');
13var dataset = require('./dataset.js');
14var Task = require('./task.js');
15
16module.exports = SkaleContext;
17
18var start = process.hrtime();
19
20util.inherits(SkaleContext, SkaleClient);
21
22function SkaleContext(arg) {
23 if (!(this instanceof SkaleContext))
24 return new SkaleContext(arg);
25 var self = this;
26
27 arg = arg || {};
28 arg.data = arg.data || {};
29 arg.data.type = 'master';
30 SkaleClient.call(this, arg);
31 var nworker = process.env.SKALE_WORKERS;
32 var tmp = arg.tmp || process.env.SKALE_TMP || '/tmp';
33
34 this.started = this.ended = false;
35 this.jobId = 0;
36 this.env = arg.env || {};
37 this.worker = [];
38 this.log = log;
39 this.contextId = uuid.v4(); // context id which will be used as scratch directory name
40
41 this.basedir = tmp + '/skale/' + this.contextId + '/';
42 mkdirp.sync(this.basedir + 'tmp');
43 mkdirp.sync(this.basedir + 'stream');
44
45 this.once('connect', function(data) {
46 var i;
47 process.title = 'skale-master_' + data.devices[0].wsid + ' ' + __filename;
48 if (!nworker || nworker > data.devices.length)
49 nworker = data.devices.length;
50 log('workers:', nworker);
51 for (i = 0; i < nworker; i++) {
52 self.worker.push(new Worker(data.devices[i]));
53 }
54 self.started = true;
55 });
56
57 this.on('workerError', function workerError(msg) {
58 console.error('Error from worker id %d:', msg.from);
59 console.error(msg.args);
60 });
61
62 this.on('remoteClose', function getWorkerClose() {
63 throw 'Fatal error: unexpected worker exit';
64 });
65
66 this.getWorkers = function (callback) {
67 if (self.started) return callback();
68 this.once('connect', callback);
69 };
70
71 function Worker(w) {
72 this.uuid = w.uuid;
73 this.id = w.id;
74 this.ip = w.ip;
75 this.ntask = 0;
76 }
77
78 Worker.prototype.rpc = function (cmd, args, done) {
79 self.request({uuid: this.uuid, id: this.id}, {cmd: cmd, args: args}, done);
80 };
81
82 Worker.prototype.send = function (cmd, args) {
83 self.send(this.uuid, {cmd: cmd, args: args});
84 };
85
86 this.on('request', function (msg) {
87 // Protocol to handle stream flow control: reply when data is consumed
88 if (msg.data.cmd == 'stream') {
89 self.emit(msg.data.stream, msg.data.data, function() {
90 try {self.reply(msg);} catch(err) { console.log(err); }
91 });
92 }
93 });
94
95 this.on('sendFile', function (msg) {
96 fs.createReadStream(msg.path, msg.opt).pipe(self.createStreamTo(msg));
97 });
98
99 this.end = function () {
100 if (self.ended) return;
101 self.ended = true;
102 if (this.started) self.set({complete: 1});
103 self._end();
104 };
105
106 this.datasetIdCounter = 0; // global dataset id counter
107
108 this.parallelize = function (localArray, nPartitions) { return dataset.parallelize(this, localArray, nPartitions);};
109 this.range = function (start, end, step, nPartitions) { return dataset.range(this, start, end, step, nPartitions);};
110 this.lineStream = function (stream, config) {return new dataset.Stream(this, stream, 'line', config);};
111 this.objectStream = function (stream, config) {return new dataset.Stream(this, stream, 'object', config);};
112
113 this.textFile = function (file, opt, nPartitions) {
114 var u = url.parse(file);
115
116 if (u.protocol === 's3:') {
117 if (file.slice(-1) === '/')
118 return new dataset.TextS3Dir(this, file.slice(5, -1), opt);
119 return new dataset.TextS3File(this, file.slice(5));
120 }
121 if (file.slice(-1) === '/')
122 return new dataset.TextDir(this, file, opt);
123 if (file.slice(-3) === '.gz')
124 return new dataset.GzipFile(this, file);
125 return new dataset.TextFile(this, file, nPartitions);
126 };
127
128 this.getReadStreamSync = function (fileObj, opt) {
129 if (fs.existsSync(fileObj.path))
130 return fs.createReadStream(fileObj.path, opt);
131 return this.createStreamFrom(fileObj.host, {cmd: 'sendFile', path: fileObj.path, opt: opt});
132 };
133
134 this.runTask = function(task, callback) {
135 function getLeastBusyWorkerId(/* preferredLocation */) {
136 var wid, ntask;
137 for (var i = 0; i < self.worker.length; i++) {
138 if ((ntask == undefined) || (ntask > self.worker[i].ntask)) {
139 ntask = self.worker[i].ntask;
140 wid = i;
141 }
142 }
143 return wid;
144 }
145
146 function serialize(task) {
147 var pleft;
148 var pright;
149 var nodeId;
150 var p = task.pid;
151 var node = task.nodes[task.datasetId];
152 var part = node.shufflePartitions ? node.shufflePartitions[p] : node.partitions[p];
153 var pindex = {};
154
155 // Walk through dataset ancestors to track partition dependencies
156 while (part) {
157 pindex[part.datasetId] = p;
158 node = task.nodes[part.parentDatasetId];
159 if (!node) break;
160 p = part.parentPartitionIndex;
161 part = node.shufflePartitions ? node.shufflePartitions[p] : node.partitions[p];
162 }
163
164 // Stringification of dataset: skip any data not relevant to the task
165 return str = JSON.stringify(task, function(key, value) {
166 if (key == 'sc') return undefined;
167 if (key == 'dependencies') {
168 var dep = [];
169 for (var i = 0; i < value.length; i++) dep[i] = value[i].id;
170 return dep;
171 }
172 if (key === 'pleft') pleft = value;
173 else if (key === 'pright') pright = value;
174 else if (key === 'id') nodeId = value;
175
176 // For shufflePartitions (not cartesian), return only the ones used by the task.
177 if (key === 'files' && ! value.path) {
178 var v = {};
179 v[pindex[nodeId]] = value[pindex[nodeId]];
180 return v;
181 }
182
183 // For cartesian shufflePartitions, return only the ones used by the task
184 if (key === 'shufflePartitions' && value[0] && value[0].files && value[0].files.path) {
185 var p1 = Math.floor(task.pid / pright);
186 var p2 = task.pid % pright + pleft;
187 v = {};
188 v[task.pid] = value[task.pid];
189 v[p1] = value[p1];
190 v[p2] = value[p2];
191 return v;
192 }
193 return (typeof value === 'function') ? value.toString() : value;
194 });
195 }
196
197 var wid = getLeastBusyWorkerId(task.nodes[task.datasetId].getPreferedLocation(task.pid));
198
199 // Init some environment on the worker the first time we send it a task
200 if (!this.worker[wid].init) {
201 this.worker[wid].init = true;
202 task.env = this.env;
203 }
204
205 this.worker[wid].ntask++;
206 var str = serialize(task);
207 //log('task size for worker ' + wid + ':', str.length);
208 //log('task', str);
209 if (str.length > 1000000) {
210 zlib.gzip(str, {chunkSize: 65536}, function (err, res) {
211 var filename = task.basedir + 'task-' + uuid.v4() + '.gz';
212 fs.writeFile(filename, res, function (err) {
213 if (err) throw new Error(err);
214 rpc('runztask', wid, filename, function (err, res) {
215 self.worker[wid].ntask--;
216 callback(err, res, task);
217 });
218 });
219 });
220 } else {
221 rpc('runTask', wid, str, function(err, res) {
222 self.worker[wid].ntask--;
223 callback(err, res, task);
224 });
225 }
226 };
227
228 this.runJob = function(opt, root, action, callback) {
229 var jobId = this.jobId++;
230
231 this.getWorkers(function () {
232 findShuffleStages(function(shuffleStages) {
233 if (shuffleStages.length == 0) runResultStage();
234 else {
235 var cnt = 0;
236 runShuffleStage(shuffleStages[cnt], shuffleDone);
237 }
238 function shuffleDone() {
239 if (++cnt < shuffleStages.length) runShuffleStage(shuffleStages[cnt], shuffleDone);
240 else runResultStage();
241 }
242 });
243 });
244
245 function runShuffleStage(stage, done) {
246 findNodes(stage, function(nodes) {
247 var pid = 0, tasks = [], i, j;
248 stage.shufflePartitions = {};
249 var index = 0;
250 var busy = 0;
251 var complete = 0;
252 var node;
253
254 for (i = 0; i < stage.dependencies.length; i++) {
255 node = stage.dependencies[i];
256 for (j = 0; j < node.nPartitions; j++)
257 stage.shufflePartitions[pid++] = new dataset.Partition(stage.id, pid, node.id, node.partitions[j].partitionIndex);
258 }
259 stage.nShufflePartitions = pid;
260
261 for (i = 0; i < stage.nShufflePartitions; i++) {
262 tasks.push(new Task({
263 basedir: self.basedir,
264 jobId: jobId,
265 nodes: nodes,
266 datasetId: stage.id,
267 pid: i
268 }));
269 }
270
271 function runNext() {
272 while (busy < nworker && index < tasks.length) {
273 busy++;
274 self.runTask(tasks[index++], function (err, res, task) {
275 stage.shufflePartitions[res.pid].files = res.files;
276 busy--;
277 complete++;
278 log('part ' + task.pid + ' done. Complete: ' + complete + '/' + tasks.length);
279 if (complete === tasks.length) {
280 stage.executed = true;
281 return done();
282 }
283 runNext();
284 });
285 }
286 }
287
288 log('start shuffle stage, partitions:', stage.nShufflePartitions);
289 runNext();
290 });
291 }
292
293 function runResultStage() {
294 findNodes(root, function(nodes) {
295 var tasks = [];
296 log('start result stage, partitions:', root.nPartitions);
297 for (var i = 0; i < root.nPartitions; i++) {
298 tasks.push(new Task({
299 basedir: self.basedir,
300 jobId: jobId,
301 nodes: nodes,
302 datasetId: root.id,
303 pid: i,
304 action: action
305 }));
306 }
307 callback({id: jobId}, tasks);
308 });
309 }
310
311 function findNodes(node, done) {
312 var nodes = {};
313 interruptibleTreewalk(node, function cin(n, done) {
314 done(n.shuffling && n.executed);
315 }, function cout(n, done) {
316 n.getPartitions(function() {
317 if (nodes[n.id] == undefined) nodes[n.id] = n;
318 done();
319 });
320 }, function() {done(nodes);});
321 }
322
323 function findShuffleStages(callback) {
324 var stages = [];
325 interruptibleTreewalk(root, function cin(n, done) {
326 if (n.shuffling && !n.executed) stages.unshift(n);
327 done(n.shuffling && n.executed); // stage boundary are shuffle nodes
328 }, function cout(n, done) {done();}, function() {callback(stages);});
329 }
330
331 function interruptibleTreewalk(n, cin, cout, done) {
332 cin(n, function(uturn) { // if uturn equals true the subtree under node won't be treewalked
333 if (!uturn) {
334 var nDependencies = 0;
335 for (var i = 0; i < n.dependencies.length; i++)
336 interruptibleTreewalk(n.dependencies[i], cin, cout, function() {
337 if (++nDependencies == n.dependencies.length) cout(n, done);
338 });
339 if (n.dependencies.length == 0) cout(n, done);
340 } else cout(n, done);
341 });
342 }
343 };
344
345 function rpc(cmd, workerNum, args, callback) {
346 self.request(self.worker[workerNum], {cmd: cmd, args: args, master_uuid: self.uuid, worker: self.worker}, callback);
347 }
348}
349
350if (process.env.SKALE_DEBUG) {
351 var log = function() {
352 var args = Array.prototype.slice.call(arguments);
353 var elapsed = process.hrtime(start);
354 args.unshift('[master ' + (elapsed[0] + elapsed[1] / 1e9).toFixed(3) + 's]');
355 console.error.apply(null, args);
356 };
357} else {
358 log = function () {};
359}