1 | 'use strict';
|
2 |
|
3 | var http = require('http');
|
4 |
|
5 | var uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-4][0-9a-f]{3}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
|
6 |
|
7 | module.exports = Task;
|
8 |
|
9 |
|
10 | function Task(init) {
|
11 | this.basedir = init.basedir;
|
12 | this.datasetId = init.datasetId;
|
13 | this.pid = init.pid;
|
14 | this.nodes = init.nodes;
|
15 | this.action = init.action;
|
16 | this.files = {};
|
17 |
|
18 |
|
19 |
|
20 | }
|
21 |
|
22 | Task.prototype.run = function(done) {
|
23 | var pipeline = [], self = this, mm = this.mm, action = this.action;
|
24 | var p = this.pid;
|
25 | var tmpPart = action ? this.nodes[this.datasetId].partitions[p] : this.nodes[this.datasetId].shufflePartitions[p];
|
26 | var tmpDataset = this.nodes[tmpPart.datasetId];
|
27 | var blocksToRegister = [];
|
28 |
|
29 | this.lib.mkdirp.sync(this.basedir + 'shuffle');
|
30 | this.lib.mkdirp.sync(this.basedir + 'export');
|
31 |
|
32 |
|
33 | if (this.env) {
|
34 | this.log('env:', this.env);
|
35 | for (var e in this.env) {
|
36 | if (this.env[e]) process.env[e] = this.env[e];
|
37 | else delete process.env[e];
|
38 | }
|
39 | }
|
40 |
|
41 | if (action) {
|
42 | if (action.opt._foreach) {
|
43 | pipeline.push({transform: function foreach(context, data) {
|
44 | for (var i = 0; i < data.length; i++) action.src(data[i], action.opt, self);
|
45 | }});
|
46 | } else {
|
47 | pipeline.push({transform: function aggregate(context, data) {
|
48 | for (var i = 0; i < data.length; i++)
|
49 | action.init = action.src(action.init, data[i], action.opt, self);
|
50 | }});
|
51 | }
|
52 | }
|
53 |
|
54 | for (;;) {
|
55 | var tmpPartAvailable = mm.isAvailable(tmpPart);
|
56 | if (!tmpPartAvailable && tmpDataset.persistent) {
|
57 | if ((action != undefined) || (tmpDataset.id != this.datasetId)) {
|
58 | blocksToRegister.push(tmpPart);
|
59 | pipeline.unshift(tmpPart);
|
60 | tmpPart.mm = this.mm;
|
61 | }
|
62 | }
|
63 | if (tmpPartAvailable || (tmpPart.parentDatasetId == undefined)) break;
|
64 | pipeline.unshift(tmpDataset);
|
65 | tmpPart = this.nodes[tmpPart.parentDatasetId].partitions[tmpPart.parentPartitionIndex];
|
66 | tmpDataset = this.nodes[tmpPart.datasetId];
|
67 | }
|
68 |
|
69 |
|
70 | if (action) {
|
71 | if (action.opt._preIterate) {
|
72 | action.opt._preIterate(action.opt, this, tmpPart.partitionIndex);
|
73 | }
|
74 | }
|
75 |
|
76 |
|
77 | if (tmpPartAvailable) mm.partitions[tmpPart.datasetId + '.' + tmpPart.partitionIndex].iterate(this, tmpPart.partitionIndex, pipeline, iterateDone);
|
78 | else this.nodes[tmpPart.datasetId].iterate(this, tmpPart.partitionIndex, pipeline, iterateDone);
|
79 |
|
80 |
|
81 | function iterateDone() {
|
82 | blocksToRegister.map(function(block) {mm.register(block);});
|
83 | if (action) {
|
84 | if (action.opt._postIterate) {
|
85 | action.opt._postIterate(action.init, action.opt, self, tmpPart.partitionIndex, function () {
|
86 | done({data: {host: self.grid.host.uuid, path: self.exportFile}});
|
87 | });
|
88 | } else done({data: action.init});
|
89 | } else self.nodes[self.datasetId].spillToDisk(self, function() {
|
90 | done({pid: self.pid, files: self.files});
|
91 | });
|
92 | }
|
93 | };
|
94 |
|
95 |
|
96 |
|
97 |
|
98 |
|
99 | Task.prototype.getReadStream = function (fileObj, opt, done) {
|
100 | var fs = this.lib.fs;
|
101 | if (fs.existsSync(fileObj.path)) return done(null, fs.createReadStream(fileObj.path, opt));
|
102 |
|
103 | if (!fileObj.host) fileObj.host = this.grid.muuid;
|
104 | if (uuidPattern.test(fileObj.host))
|
105 | return done(null, this.grid.createStreamFrom(fileObj.host, {cmd: 'sendFile', path: fileObj.path, opt: opt}));
|
106 | var url = 'http://' + fileObj.host + fileObj.path;
|
107 | http.get(url, function (res) {
|
108 | done(null, res);
|
109 | });
|
110 | };
|
111 |
|
112 |
|
113 |
|
114 | Task.prototype.getReadStreamSync = function (fileObj, opt) {
|
115 | var fs = this.lib.fs;
|
116 | if (fs.existsSync(fileObj.path)) return fs.createReadStream(fileObj.path, opt);
|
117 | if (!fileObj.host) fileObj.host = this.grid.muuid;
|
118 | return this.grid.createStreamFrom(fileObj.host, {cmd: 'sendFile', path: fileObj.path, opt: opt});
|
119 | };
|