UNPKG

4.92 kBJavaScriptView Raw
1'use strict';
2
3var http = require('http');
4
5var uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-4][0-9a-f]{3}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
6
7module.exports = Task;
8
9// function Task(basedir, jobId, nodes, datasetId, pid, action) {
10function Task(init) {
11 this.basedir = init.basedir;
12 this.datasetId = init.datasetId;
13 this.pid = init.pid;
14 this.nodes = init.nodes;
15 this.action = init.action;
16 this.files = {}; // object in which we store shuffle file informations to be sent back to master
17// this.lib; // handler to libraries required on worker side (which cannot be serialized)
18// this.mm; // handler to worker side memory manager instance
19// this.grid; // handler to socket object instance
20}
21
22Task.prototype.run = function(done) {
23 var pipeline = [], self = this, mm = this.mm, action = this.action;
24 var p = this.pid;
25 var tmpPart = action ? this.nodes[this.datasetId].partitions[p] : this.nodes[this.datasetId].shufflePartitions[p];
26 var tmpDataset = this.nodes[tmpPart.datasetId];
27 var blocksToRegister = [];
28
29 this.lib.mkdirp.sync(this.basedir + 'shuffle');
30 this.lib.mkdirp.sync(this.basedir + 'export');
31
32 // Propagate environment settings from master
33 if (this.env) {
34 this.log('env:', this.env);
35 for (var e in this.env) {
36 if (this.env[e]) process.env[e] = this.env[e];
37 else delete process.env[e];
38 }
39 }
40
41 if (action) {
42 if (action.opt._foreach) {
43 pipeline.push({transform: function foreach(context, data) {
44 for (var i = 0; i < data.length; i++) action.src(data[i], action.opt, self);
45 }});
46 } else {
47 pipeline.push({transform: function aggregate(context, data) {
48 for (var i = 0; i < data.length; i++)
49 action.init = action.src(action.init, data[i], action.opt, self);
50 }});
51 }
52 }
53
54 for (;;) {
55 var tmpPartAvailable = mm.isAvailable(tmpPart); // is partition available in memory
56 if (!tmpPartAvailable && tmpDataset.persistent) { // if data must be stored in memory
57 if ((action != undefined) || (tmpDataset.id != this.datasetId)) { // no persist if no action and shuffleRDD
58 blocksToRegister.push(tmpPart); // register block inside memory manager
59 pipeline.unshift(tmpPart); // add it to pipeline
60 tmpPart.mm = this.mm;
61 }
62 }
63 if (tmpPartAvailable || (tmpPart.parentDatasetId == undefined)) break; // source partition found
64 pipeline.unshift(tmpDataset); // else add current dataset transform to pipeline
65 tmpPart = this.nodes[tmpPart.parentDatasetId].partitions[tmpPart.parentPartitionIndex];
66 tmpDataset = this.nodes[tmpPart.datasetId];
67 }
68
69 // Pre-iterate actions
70 if (action) {
71 if (action.opt._preIterate) {
72 action.opt._preIterate(action.opt, this, tmpPart.partitionIndex);
73 }
74 }
75
76 // Iterate actions
77 if (tmpPartAvailable) mm.partitions[tmpPart.datasetId + '.' + tmpPart.partitionIndex].iterate(this, tmpPart.partitionIndex, pipeline, iterateDone);
78 else this.nodes[tmpPart.datasetId].iterate(this, tmpPart.partitionIndex, pipeline, iterateDone);
79
80 // Post-iterate actions
81 function iterateDone() {
82 blocksToRegister.map(function(block) {mm.register(block);});
83 if (action) {
84 if (action.opt._postIterate) {
85 action.opt._postIterate(action.init, action.opt, self, tmpPart.partitionIndex, function () {
86 done({data: {host: self.grid.host.uuid, path: self.exportFile}});
87 });
88 } else done({data: action.init});
89 } else self.nodes[self.datasetId].spillToDisk(self, function() {
90 done({pid: self.pid, files: self.files});
91 });
92 }
93};
94
95// Get a readable stream for shuffle or source file.
96// First, attempt to read from local filesystem
97// If not present, attempt to access an HTTP server
98// If HTTP server not available, use skale transport through skale server
99Task.prototype.getReadStream = function (fileObj, opt, done) {
100 var fs = this.lib.fs;
101 if (fs.existsSync(fileObj.path)) return done(null, fs.createReadStream(fileObj.path, opt));
102 // Default host is master
103 if (!fileObj.host) fileObj.host = this.grid.muuid;
104 if (uuidPattern.test(fileObj.host))
105 return done(null, this.grid.createStreamFrom(fileObj.host, {cmd: 'sendFile', path: fileObj.path, opt: opt}));
106 var url = 'http://' + fileObj.host + fileObj.path;
107 http.get(url, function (res) {
108 done(null, res);
109 });
110};
111
112// Same as above getReadStream, but return a streams synchronously.
113// This may be more expensive, as it requires an additional pass-through stream
114Task.prototype.getReadStreamSync = function (fileObj, opt) {
115 var fs = this.lib.fs;
116 if (fs.existsSync(fileObj.path)) return fs.createReadStream(fileObj.path, opt);
117 if (!fileObj.host) fileObj.host = this.grid.muuid;
118 return this.grid.createStreamFrom(fileObj.host, {cmd: 'sendFile', path: fileObj.path, opt: opt});
119};