UNPKG

4.54 kBJavaScriptView Raw
1// Copyright 2016 Luca-SAS, licensed under the Apache License 2.0
2
3// worker module
4
5'use strict';
6
7var fs = require('fs');
8var zlib = require('zlib');
9var mkdirp = require('mkdirp');
10var url = require('url');
11var uuid = require('uuid');
12var Dataset = require('./dataset.js');
13var Task = require('./task.js');
14var sizeOf = require('./rough-sizeof.js');
15var Lines = require('./lines.js');
16var readSplit = require('./readsplit.js').readSplit;
17var AWS = require('aws-sdk');
18
19var workerId = process.argv[2];
20var memory = process.argv[3] || 1024;
21
22var mm = new MemoryManager(memory);
23
24var start = process.hrtime();
25
26process.title = 'skale-worker-' + workerId;
27
28process.on('disconnect', function () {
29 log('disconnected, exit');
30 process.exit();
31});
32
33process.on('message', function (msg) {
34 if (typeof msg === 'object' && msg.req) {
35 switch (msg.req.cmd) {
36 case 'runTask':
37 runTask(msg);
38 break;
39 case 'runztask':
40 runztask(msg);
41 break;
42 }
43 }
44});
45
46function runztask(msg) {
47 var file = msg.req.args;
48 fs.readFile(file, function (err, data) {
49 fs.unlink(file, function () {});
50 if (err) throw new Error(err);
51 zlib.gunzip(data, {chunkSize: 65536}, function (err, data) {
52 if (err) throw new Error(err);
53 msg.req.args = data;
54 runTask(msg);
55 });
56 });
57}
58
59function runTask(msg) {
60 var task = parseTask(msg.req.args);
61 task.workerId = workerId;
62 task.grid = {host: {}};
63 task.mm = mm;
64 task.lib = {AWS: AWS, fs: fs, Lines: Lines, mkdirp: mkdirp, mm: mm, readSplit: readSplit, url: url, uuid: uuid, zlib: zlib};
65 task.log = log;
66 task.run(function (result) {
67 delete msg.req.args;
68 msg.result = result;
69 process.send(msg);
70 });
71}
72
73function parseTask(str) {
74 var i, j, n, ref;
75 var task = JSON.parse(str, function (key, value) {
76 if (typeof value == 'string') {
77 // String value can be a regular function or an ES6 arrow function
78 if (value.substring(0, 8) == 'function') {
79 var args = value.match(/\(([^)]*)/)[1];
80 var body = value.replace(/^function\s*[^)]*\)\s*{/, '').replace(/}$/, '');
81 value = new Function(args, body);
82 } else if (value.match(/^\s*\(\s*[^(][^)]*\)\s*=>/) || value.match(/^\s*\w+\s*=>/))
83 value = ('indirect', eval)(value);
84 }
85 return value;
86 });
87
88 for (i in task.nodes) {
89 n = task.nodes[i];
90 for (j in n.dependencies) {
91 ref = n.dependencies[j];
92 n.dependencies[j] = task.nodes[ref];
93 }
94 for (j in n.partitions) {
95 Object.setPrototypeOf(task.nodes[i].partitions[j], Dataset.Partition.prototype);
96 task.nodes[i].partitions[j].count = 0;
97 task.nodes[i].partitions[j].bsize = 0;
98 task.nodes[i].partitions[j].tsize = 0;
99 task.nodes[i].partitions[j].skip = false;
100 }
101 if (n.type) {
102 Object.setPrototypeOf(task.nodes[i], Dataset[n.type].prototype);
103 }
104 if (n.partitioner && n.partitioner.type) {
105 Object.setPrototypeOf(n.partitioner, Dataset[n.partitioner.type].prototype);
106 }
107 }
108 Object.setPrototypeOf(task, Task.prototype);
109 //log('task:', JSON.stringify(task, null, 2));
110 return task;
111}
112
113function MemoryManager(memory) {
114 var Kb = 1024, Mb = 1024 * Kb;
115 var MAX_MEMORY = (memory - 100) * Mb;
116 var maxStorageMemory = MAX_MEMORY * 0.4;
117 var maxShuffleMemory = MAX_MEMORY * 0.2;
118 var maxCollectMemory = MAX_MEMORY * 0.2;
119
120 this.storageMemory = 0;
121 this.shuffleMemory = 0;
122 this.collectMemory = 0;
123 this.sizeOf = sizeOf;
124
125 this.storageFull = function () {return (this.storageMemory > maxStorageMemory);};
126 this.shuffleFull = function () {return (this.shuffleMemory > maxShuffleMemory);};
127 this.collectFull = function () {return (this.collectMemory > maxCollectMemory);};
128
129 this.partitions = {};
130 this.register = function (partition) {
131 var key = partition.datasetId + '.' + partition.partitionIndex;
132 if (!(key in this.partitions)) this.partitions[key] = partition;
133 };
134
135 this.unregister = function (partition) {
136 this.partitions[partition.datasetId + '.' + partition.partitionIndex] = undefined;
137 };
138
139 this.isAvailable = function (partition) {
140 return (this.partitions[partition.datasetId + '.' + partition.partitionIndex] != undefined);
141 };
142}
143
144if (process.env.SKALE_DEBUG > 1) {
145 var log = function() {
146 var args = Array.prototype.slice.call(arguments);
147 var elapsed = process.hrtime(start);
148 args.unshift('[worker-' + process.argv[2] + ' ' + (elapsed[0] + elapsed[1] / 1e9).toFixed(3) + 's]');
149 console.error.apply(null, args);
150 };
151} else {
152 log = function () {};
153}