1 |
|
2 |
|
3 |
|
4 |
|
5 | 'use strict';
|
6 |
|
7 | var fs = require('fs');
|
8 | var zlib = require('zlib');
|
9 | var mkdirp = require('mkdirp');
|
10 | var url = require('url');
|
11 | var uuid = require('uuid');
|
12 | var Dataset = require('./dataset.js');
|
13 | var Task = require('./task.js');
|
14 | var sizeOf = require('./rough-sizeof.js');
|
15 | var Lines = require('./lines.js');
|
16 | var readSplit = require('./readsplit.js').readSplit;
|
17 | var AWS = require('aws-sdk');
|
18 |
|
19 | var workerId = process.argv[2];
|
20 | var memory = process.argv[3] || 1024;
|
21 |
|
22 | var mm = new MemoryManager(memory);
|
23 |
|
24 | var start = process.hrtime();
|
25 |
|
26 | process.title = 'skale-worker-' + workerId;
|
27 |
|
28 | process.on('disconnect', function () {
|
29 | log('disconnected, exit');
|
30 | process.exit();
|
31 | });
|
32 |
|
33 | process.on('message', function (msg) {
|
34 | if (typeof msg === 'object' && msg.req) {
|
35 | switch (msg.req.cmd) {
|
36 | case 'runTask':
|
37 | runTask(msg);
|
38 | break;
|
39 | case 'runztask':
|
40 | runztask(msg);
|
41 | break;
|
42 | }
|
43 | }
|
44 | });
|
45 |
|
46 | function runztask(msg) {
|
47 | var file = msg.req.args;
|
48 | fs.readFile(file, function (err, data) {
|
49 | fs.unlink(file, function () {});
|
50 | if (err) throw new Error(err);
|
51 | zlib.gunzip(data, {chunkSize: 65536}, function (err, data) {
|
52 | if (err) throw new Error(err);
|
53 | msg.req.args = data;
|
54 | runTask(msg);
|
55 | });
|
56 | });
|
57 | }
|
58 |
|
59 | function runTask(msg) {
|
60 | var task = parseTask(msg.req.args);
|
61 | task.workerId = workerId;
|
62 | task.grid = {host: {}};
|
63 | task.mm = mm;
|
64 | task.lib = {AWS: AWS, fs: fs, Lines: Lines, mkdirp: mkdirp, mm: mm, readSplit: readSplit, url: url, uuid: uuid, zlib: zlib};
|
65 | task.log = log;
|
66 | task.run(function (result) {
|
67 | delete msg.req.args;
|
68 | msg.result = result;
|
69 | process.send(msg);
|
70 | });
|
71 | }
|
72 |
|
73 | function parseTask(str) {
|
74 | var i, j, n, ref;
|
75 | var task = JSON.parse(str, function (key, value) {
|
76 | if (typeof value == 'string') {
|
77 |
|
78 | if (value.substring(0, 8) == 'function') {
|
79 | var args = value.match(/\(([^)]*)/)[1];
|
80 | var body = value.replace(/^function\s*[^)]*\)\s*{/, '').replace(/}$/, '');
|
81 | value = new Function(args, body);
|
82 | } else if (value.match(/^\s*\(\s*[^(][^)]*\)\s*=>/) || value.match(/^\s*\w+\s*=>/))
|
83 | value = ('indirect', eval)(value);
|
84 | }
|
85 | return value;
|
86 | });
|
87 |
|
88 | for (i in task.nodes) {
|
89 | n = task.nodes[i];
|
90 | for (j in n.dependencies) {
|
91 | ref = n.dependencies[j];
|
92 | n.dependencies[j] = task.nodes[ref];
|
93 | }
|
94 | for (j in n.partitions) {
|
95 | Object.setPrototypeOf(task.nodes[i].partitions[j], Dataset.Partition.prototype);
|
96 | task.nodes[i].partitions[j].count = 0;
|
97 | task.nodes[i].partitions[j].bsize = 0;
|
98 | task.nodes[i].partitions[j].tsize = 0;
|
99 | task.nodes[i].partitions[j].skip = false;
|
100 | }
|
101 | if (n.type) {
|
102 | Object.setPrototypeOf(task.nodes[i], Dataset[n.type].prototype);
|
103 | }
|
104 | if (n.partitioner && n.partitioner.type) {
|
105 | Object.setPrototypeOf(n.partitioner, Dataset[n.partitioner.type].prototype);
|
106 | }
|
107 | }
|
108 | Object.setPrototypeOf(task, Task.prototype);
|
109 |
|
110 | return task;
|
111 | }
|
112 |
|
113 | function MemoryManager(memory) {
|
114 | var Kb = 1024, Mb = 1024 * Kb;
|
115 | var MAX_MEMORY = (memory - 100) * Mb;
|
116 | var maxStorageMemory = MAX_MEMORY * 0.4;
|
117 | var maxShuffleMemory = MAX_MEMORY * 0.2;
|
118 | var maxCollectMemory = MAX_MEMORY * 0.2;
|
119 |
|
120 | this.storageMemory = 0;
|
121 | this.shuffleMemory = 0;
|
122 | this.collectMemory = 0;
|
123 | this.sizeOf = sizeOf;
|
124 |
|
125 | this.storageFull = function () {return (this.storageMemory > maxStorageMemory);};
|
126 | this.shuffleFull = function () {return (this.shuffleMemory > maxShuffleMemory);};
|
127 | this.collectFull = function () {return (this.collectMemory > maxCollectMemory);};
|
128 |
|
129 | this.partitions = {};
|
130 | this.register = function (partition) {
|
131 | var key = partition.datasetId + '.' + partition.partitionIndex;
|
132 | if (!(key in this.partitions)) this.partitions[key] = partition;
|
133 | };
|
134 |
|
135 | this.unregister = function (partition) {
|
136 | this.partitions[partition.datasetId + '.' + partition.partitionIndex] = undefined;
|
137 | };
|
138 |
|
139 | this.isAvailable = function (partition) {
|
140 | return (this.partitions[partition.datasetId + '.' + partition.partitionIndex] != undefined);
|
141 | };
|
142 | }
|
143 |
|
144 | if (process.env.SKALE_DEBUG > 1) {
|
145 | var log = function() {
|
146 | var args = Array.prototype.slice.call(arguments);
|
147 | var elapsed = process.hrtime(start);
|
148 | args.unshift('[worker-' + process.argv[2] + ' ' + (elapsed[0] + elapsed[1] / 1e9).toFixed(3) + 's]');
|
149 | console.error.apply(null, args);
|
150 | };
|
151 | } else {
|
152 | log = function () {};
|
153 | }
|