1 |
|
2 |
|
3 | 'use strict';
|
4 |
|
5 | var fs = require('fs');
|
6 | var path = require('path');
|
7 | var url = require('url');
|
8 | var util = require('util');
|
9 | var stream = require('stream');
|
10 | var thenify = require('thenify').withCallback;
|
11 | var uuid = require('uuid');
|
12 | var splitLocalFile = require('./readsplit.js').splitLocalFile;
|
13 | var splitHDFSFile = require('./readsplit.js').splitHDFSFile;
|
14 | var AWS = require('aws-sdk');
|
15 | var merge2 = require('merge2');
|
16 |
|
17 | function Dataset(sc, dependencies) {
|
18 | this.id = sc.datasetIdCounter++;
|
19 | this.dependencies = dependencies || [];
|
20 | this.persistent = false;
|
21 | this.sc = sc;
|
22 | }
|
23 |
|
24 | Dataset.prototype.persist = function () {this.persistent = true; return this;};
|
25 |
|
26 | Dataset.prototype.map = function (mapper, args) {return new Map(this, mapper, args);};
|
27 |
|
28 | Dataset.prototype.flatMap = function (mapper, args) {return new FlatMap(this, mapper, args);};
|
29 |
|
30 | Dataset.prototype.mapValues = function (mapper, args) {return new MapValues(this, mapper, args);};
|
31 |
|
32 | Dataset.prototype.flatMapValues = function (mapper, args) {return new FlatMapValues(this, mapper, args);};
|
33 |
|
34 | Dataset.prototype.filter = function (filter, args) {return new Filter(this, filter, args);};
|
35 |
|
36 | Dataset.prototype.sample = function (withReplacement, frac, seed) {return new Sample(this, withReplacement, frac, seed || 1);};
|
37 |
|
38 | Dataset.prototype.union = function (other) {return (other.id == this.id) ? this : new Union(this.sc, [this, other]);};
|
39 |
|
40 | Dataset.prototype.aggregateByKey = function (reducer, combiner, init, args) {
|
41 | if (arguments.length < 3) throw new Error('Missing argument for function aggregateByKey().');
|
42 | return new AggregateByKey(this.sc, [this], reducer, combiner, init, args);
|
43 | };
|
44 |
|
45 | Dataset.prototype.reduceByKey = function (reducer, init, args) {
|
46 | if (arguments.length < 2) throw new Error('Missing argument for function reduceByKey().');
|
47 | return new AggregateByKey(this.sc, [this], reducer, reducer, init, args);
|
48 | };
|
49 |
|
50 | Dataset.prototype.groupByKey = function () {
|
51 | function reducer(a, b) {a.push(b); return a;}
|
52 | function combiner(a, b) {return a.concat(b);}
|
53 | return new AggregateByKey(this.sc, [this], reducer, combiner, [], {});
|
54 | };
|
55 |
|
56 | Dataset.prototype.coGroup = function (other) {
|
57 | function reducer(a, b) {a.push(b); return a;}
|
58 | function combiner(a, b) {
|
59 | for (var i = 0; i < b.length; i++) a[i] = a[i].concat(b[i]);
|
60 | return a;
|
61 | }
|
62 | return new AggregateByKey(this.sc, [this, other], reducer, combiner, [], {});
|
63 | };
|
64 |
|
65 | Dataset.prototype.cartesian = function (other) {return new Cartesian(this.sc, [this, other]);};
|
66 |
|
67 | Dataset.prototype.sortBy = function (sorter, ascending, numPartitions) {
|
68 | return new SortBy(this.sc, this, sorter, ascending, numPartitions);
|
69 | };
|
70 |
|
71 | Dataset.prototype.partitionBy = function (partitioner) {
|
72 | return new PartitionBy(this.sc, this, partitioner);
|
73 | };
|
74 |
|
75 | Dataset.prototype.sortByKey = function (ascending, numPartitions) {
|
76 | return new SortBy(this.sc, this, function (data) {return data[0];}, ascending, numPartitions);
|
77 | };
|
78 |
|
79 | Dataset.prototype.join = function (other) {
|
80 | return this.coGroup(other).flatMapValues(function (v) {
|
81 | var res = [];
|
82 | for (var i in v[0])
|
83 | for (var j in v[1])
|
84 | res.push([v[0][i], v[1][j]]);
|
85 | return res;
|
86 | });
|
87 | };
|
88 |
|
89 | Dataset.prototype.leftOuterJoin = function (other) {
|
90 | return this.coGroup(other).flatMapValues(function (v) {
|
91 | var res = [], i, j;
|
92 | if (v[1].length == 0) {
|
93 | for (i in v[0]) res.push([v[0][i], null]);
|
94 | } else {
|
95 | for (i in v[0])
|
96 | for (j in v[1]) res.push([v[0][i], v[1][j]]);
|
97 | }
|
98 | return res;
|
99 | });
|
100 | };
|
101 |
|
102 | Dataset.prototype.rightOuterJoin = function (other) {
|
103 | return this.coGroup(other).flatMapValues(function (v) {
|
104 | var res = [], i, j;
|
105 | if (v[0].length == 0) {
|
106 | for (i in v[1]) res.push([null, v[1][i]]);
|
107 | } else {
|
108 | for (i in v[0])
|
109 | for (j in v[1]) res.push([v[0][i], v[1][j]]);
|
110 | }
|
111 | return res;
|
112 | });
|
113 | };
|
114 |
|
115 | Dataset.prototype.distinct = function () {
|
116 | return this.map(function (e) {return [e, null];})
|
117 | .reduceByKey(function (a) {return a;}, null)
|
118 | .map(function (a) {return a[0];});
|
119 | };
|
120 |
|
121 | Dataset.prototype.intersection = function (other) {
|
122 | function mapper(e) {return [e, 0];}
|
123 | function reducer(a) {return a + 1;}
|
124 | var a = this.map(mapper).reduceByKey(reducer, 0);
|
125 | var b = other.map(mapper).reduceByKey(reducer, 0);
|
126 | return a.coGroup(b).flatMap(function (a) {
|
127 | return (a[1][0].length && a[1][1].length) ? [a[0]] : [];
|
128 | });
|
129 | };
|
130 |
|
131 | Dataset.prototype.subtract = function (other) {
|
132 | function mapper(e) {return [e, 0];}
|
133 | function reducer(a) {return a + 1;}
|
134 | var a = this.map(mapper).reduceByKey(reducer, 0);
|
135 | var b = other.map(mapper).reduceByKey(reducer, 0);
|
136 | return a.coGroup(b).flatMap(function (a) {
|
137 | var res = [];
|
138 | if (a[1][0].length && (a[1][1].length == 0))
|
139 | for (var i = 0; i < a[1][0][0]; i++) res.push(a[0]);
|
140 | return res;
|
141 | });
|
142 | };
|
143 |
|
144 | Dataset.prototype.keys = function () {return this.map(function (a) {return a[0];});};
|
145 |
|
146 | Dataset.prototype.values = function () {return this.map(function (a) {return a[1];});};
|
147 |
|
148 | Dataset.prototype.lookup = thenify(function (key, done) {
|
149 | return this.filter(function (kv, args) {return kv[0] === args.key;}, {key: key})
|
150 | .map(function (kv) {return kv[1];}).collect(done);
|
151 | });
|
152 |
|
153 | Dataset.prototype.countByValue = thenify(function (done) {
|
154 | return this.map(function (e) {return [e, 1];})
|
155 | .reduceByKey(function (a, b) {return a + b;}, 0)
|
156 | .collect(done);
|
157 | });
|
158 |
|
159 | Dataset.prototype.countByKey = thenify(function (done) {
|
160 | return this.mapValues(function () {return 1;})
|
161 | .reduceByKey(function (a, b) {return a + b;}, 0)
|
162 | .collect(done);
|
163 | });
|
164 |
|
165 | Dataset.prototype.collect = thenify(function (done) {
|
166 | var reducer = function (a, b) {a.push(b); return a;};
|
167 | var combiner = function (a, b) {return a.concat(b);};
|
168 | var init = [], action = {args: [], src: reducer, init: init, opt: {}}, self = this;
|
169 |
|
170 | return this.sc.runJob({}, this, action, function (job, tasks) {
|
171 | var result = JSON.parse(JSON.stringify(init)), cnt = 0;
|
172 | function taskDone(err, res) {
|
173 | result = combiner(result, res.data);
|
174 | if (++cnt < tasks.length) return self.sc.runTask(tasks[cnt], taskDone);
|
175 | self.sc.log('result stage done');
|
176 | done(err, result);
|
177 | }
|
178 |
|
179 | self.sc.runTask(tasks[cnt], taskDone);
|
180 | });
|
181 | });
|
182 |
|
183 |
|
184 |
|
185 |
|
186 | Dataset.prototype.stream = function (options) {
|
187 | options = options || {};
|
188 | var self = this;
|
189 | var outStream = merge2();
|
190 | var opt = {
|
191 | gzip: options.gzip,
|
192 | _preIterate: function (opt, wc, p) {
|
193 | var suffix = opt.gzip ? '.gz' : '';
|
194 | wc.exportFile = wc.basedir + 'export/' + p + suffix;
|
195 | },
|
196 | _postIterate: function (acc, opt, wc, p, done) {
|
197 | var fs = wc.lib.fs;
|
198 | var zlib = wc.lib.zlib;
|
199 | if (opt.gzip) {
|
200 | fs.appendFileSync(wc.exportFile, zlib.gzipSync(acc, {
|
201 | chunckSize: 65536,
|
202 | level: zlib.Z_BEST_SPEED
|
203 | }));
|
204 | } else {
|
205 | fs.appendFileSync(wc.exportFile, acc);
|
206 | }
|
207 | done(wc.exportFile);
|
208 | }
|
209 | };
|
210 | var pstreams = [];
|
211 |
|
212 | function reducer(acc, val, opt, wc) {
|
213 | acc = acc.concat(JSON.stringify(val) + '\n');
|
214 | if (acc.length >= 65536) {
|
215 | var fs = wc.lib.fs;
|
216 | if (opt.gzip) {
|
217 | var zlib = wc.lib.zlib;
|
218 | fs.appendFileSync(wc.exportFile, zlib.gzipSync(acc, {
|
219 | chunckSize: 65536,
|
220 | level: zlib.Z_BEST_SPEED
|
221 | }));
|
222 | } else {
|
223 | fs.appendFileSync(wc.exportFile, acc);
|
224 | }
|
225 | acc = '';
|
226 | }
|
227 | return acc;
|
228 | }
|
229 |
|
230 | function combiner(acc1, acc2) {
|
231 | var p = acc2.path.match(/.+\/([0-9]+)/)[1];
|
232 | pstreams[p] = self.sc.getReadStreamSync(acc2);
|
233 | }
|
234 |
|
235 | this.aggregate(reducer, combiner, '', opt, function () {
|
236 | for (var i = 0; i < pstreams.length; i++)
|
237 | outStream.add(pstreams[i]);
|
238 | });
|
239 |
|
240 | if (options.end) outStream.on('end', self.sc.end);
|
241 | return outStream;
|
242 | };
|
243 |
|
244 |
|
245 |
|
246 |
|
247 |
|
248 |
|
249 |
|
250 |
|
251 |
|
252 |
|
253 | Dataset.prototype.save = thenify(function (path, options, done) {
|
254 | options = options || {};
|
255 | if (arguments.length < 3) done = options;
|
256 | var opt = {
|
257 | gzip: options.gzip,
|
258 | path: path,
|
259 | _preIterate: function (opt, wc, p) {
|
260 | var suffix = opt.gzip ? '.gz' : '';
|
261 | wc.exportFile = wc.basedir + 'export/' + p + suffix;
|
262 | },
|
263 | _postIterate: function (acc, opt, wc, p, done) {
|
264 | var suffix = opt.gzip ? '.gz' : '';
|
265 | var fs = wc.lib.fs;
|
266 | var zlib = wc.lib.zlib;
|
267 | var url, readStream, writeStream;
|
268 | if (opt.gzip)
|
269 | fs.appendFileSync(wc.exportFile, zlib.gzipSync(acc, {
|
270 | chunckSize: 65536,
|
271 | level: zlib.Z_BEST_SPEED
|
272 | }));
|
273 | else
|
274 | fs.appendFileSync(wc.exportFile, acc);
|
275 | readStream = fs.createReadStream(wc.exportFile);
|
276 | url = wc.lib.url.parse(opt.path);
|
277 | switch (url.protocol) {
|
278 | case 's3:':
|
279 | var s3 = new wc.lib.AWS.S3({
|
280 | httpOptions: {timeout: 3600000},
|
281 | signatureVersion: 'v4'
|
282 | });
|
283 | s3.upload({
|
284 | Bucket: url.host,
|
285 | Key: url.path.slice(1) + '/' + p + suffix,
|
286 | Body: readStream
|
287 | }, function (err, data) {
|
288 | if (err) wc.log('S3 upload error', err);
|
289 | wc.log('upload to s3', data);
|
290 | done();
|
291 | });
|
292 | break;
|
293 | case 'file:':
|
294 | case null:
|
295 | wc.lib.mkdirp.sync(opt.path);
|
296 | writeStream = fs.createWriteStream(url.path + '/' + p + suffix);
|
297 | readStream.pipe(writeStream);
|
298 | writeStream.on('close', done);
|
299 | break;
|
300 | default:
|
301 | wc.log('Error: unsupported protocol', url.protocol);
|
302 | done();
|
303 | }
|
304 | }
|
305 | };
|
306 |
|
307 | function reducer(acc, val, opt, wc) {
|
308 | acc = acc.concat(JSON.stringify(val) + '\n');
|
309 | if (acc.length >= 65536) {
|
310 | var fs = wc.lib.fs;
|
311 | if (opt.gzip) {
|
312 | var zlib = wc.lib.zlib;
|
313 | fs.appendFileSync(wc.exportFile, zlib.gzipSync(acc, {
|
314 | chunckSize: 65536,
|
315 | level: zlib.Z_BEST_SPEED
|
316 | }));
|
317 | } else {
|
318 | fs.appendFileSync(wc.exportFile, acc);
|
319 | }
|
320 | acc = '';
|
321 | }
|
322 | return acc;
|
323 | }
|
324 |
|
325 | return this.aggregate(reducer, function(){}, '', opt, done);
|
326 | });
|
327 |
|
328 | Dataset.prototype.first = thenify(function (done) {
|
329 | return this.take(1, function (err, res) {
|
330 | if (res) done(err, res[0]);
|
331 | else done(err);
|
332 | });
|
333 | });
|
334 |
|
335 | Dataset.prototype.take = thenify(function (N, done) {
|
336 | var reducer = function (a, b) {a.push(b); return a;};
|
337 | var combiner = function (a, b) {return a.concat(b);};
|
338 | var init = [], action = {args: [], src: reducer, init: init, opt: {}}, self = this;
|
339 |
|
340 | return this.sc.runJob({}, this, action, function (job, tasks) {
|
341 | var result = JSON.parse(JSON.stringify(init)), cnt = 0;
|
342 |
|
343 | function taskDone(err, res) {
|
344 | result = combiner(result, res.data);
|
345 | if (++cnt < tasks.length && result.length < N) self.sc.runTask(tasks[cnt], taskDone);
|
346 | else done(err, result.slice(0, N));
|
347 | }
|
348 |
|
349 | self.sc.runTask(tasks[cnt], taskDone);
|
350 | });
|
351 | });
|
352 |
|
353 | Dataset.prototype.top = thenify(function (N, done) {
|
354 | var reducer = function (a, b) {a.push(b); return a;};
|
355 | var combiner = function (a, b) {return b.concat(a);};
|
356 | var init = [], action = {args: [], src: reducer, init: init, opt: {}}, self = this;
|
357 |
|
358 | return this.sc.runJob({}, this, action, function (job, tasks) {
|
359 | var result = JSON.parse(JSON.stringify(init)), cnt = tasks.length - 1;
|
360 |
|
361 | function taskDone(err, res) {
|
362 | result = combiner(result, res.data);
|
363 | if (--cnt >= 0 || result.length < N) self.sc.runTask(tasks[cnt], taskDone);
|
364 | else done(err, result.slice(-N));
|
365 | }
|
366 |
|
367 | self.sc.runTask(tasks[cnt], taskDone);
|
368 | });
|
369 | });
|
370 |
|
371 | Dataset.prototype.aggregate = thenify(function (reducer, combiner, init, opt, done) {
|
372 | opt = opt || {}; var action = {args: [], src: reducer, init: init,
|
373 | opt: opt}, self = this;
|
374 |
|
375 | if (arguments.length < 5) done = opt;
|
376 |
|
377 | return this.sc.runJob(opt, this, action, function (job, tasks) {
|
378 | var result = JSON.parse(JSON.stringify(init));
|
379 | var nworker = self.sc.worker.length;
|
380 | var index = 0;
|
381 | var busy = 0;
|
382 | var complete = 0;
|
383 | var error;
|
384 |
|
385 | function runNext() {
|
386 | while (busy < nworker && index < tasks.length) {
|
387 | busy++;
|
388 | self.sc.runTask(tasks[index++], function (err, res, task) {
|
389 | result = combiner(result, res.data, opt);
|
390 | complete++;
|
391 | busy--;
|
392 | self.sc.log('part ' + task.pid + ' done. Complete: ' + complete + '/' + tasks.length);
|
393 | if (complete === tasks.length) return done(error, result);
|
394 | runNext();
|
395 | });
|
396 | }
|
397 | }
|
398 |
|
399 | runNext();
|
400 | });
|
401 | });
|
402 |
|
403 | Dataset.prototype.reduce = thenify(function (reducer, init, opt, done) {
|
404 | opt = opt || {};
|
405 | if (arguments.length < 4) done = opt;
|
406 | return this.aggregate(reducer, reducer, init, opt, done);
|
407 | });
|
408 |
|
409 | Dataset.prototype.count = thenify(function (done) {
|
410 | return this.aggregate(function (a) {return a + 1;}, function (a, b) {return a + b;}, 0, done);
|
411 | });
|
412 |
|
413 | Dataset.prototype.forEach = thenify(function (eacher, opt, done) {
|
414 | var arg = {opt: opt, _foreach: true};
|
415 | if (arguments.length < 3) done = opt;
|
416 | return this.aggregate(eacher, function () {return null;}, null, arg, done);
|
417 | });
|
418 |
|
419 | Dataset.prototype.getPartitions = function (done) {
|
420 | if (this.partitions == undefined) {
|
421 | this.partitions = {};
|
422 | var cnt = 0;
|
423 | for (var i = 0; i < this.dependencies.length; i++) {
|
424 | for (var j = 0; j < this.dependencies[i].nPartitions; j++) {
|
425 | this.partitions[cnt] = new Partition(this.id, cnt, this.dependencies[i].id, this.dependencies[i].partitions[j].partitionIndex);
|
426 | cnt++;
|
427 | }
|
428 | }
|
429 | this.nPartitions = cnt;
|
430 | }
|
431 | done();
|
432 | };
|
433 |
|
434 | Dataset.prototype.getPreferedLocation = function () {return [];};
|
435 |
|
436 | function Partition(datasetId, partitionIndex, parentDatasetId, parentPartitionIndex) {
|
437 | this.data = [];
|
438 | this.datasetId = datasetId;
|
439 | this.partitionIndex = partitionIndex;
|
440 | this.parentDatasetId = parentDatasetId;
|
441 | this.parentPartitionIndex = parentPartitionIndex;
|
442 | this.type = 'Partition';
|
443 |
|
444 |
|
445 |
|
446 |
|
447 | }
|
448 |
|
449 | Partition.prototype.transform = function (context, data) {
|
450 | if (this.skip) return data;
|
451 |
|
452 |
|
453 |
|
454 |
|
455 | if (this.count++ == 9999) {
|
456 | this.count = 0;
|
457 | if (this.bsize == 0) this.bsize = this.mm.sizeOf(this.data);
|
458 | this.tsize += this.bsize;
|
459 | this.mm.storageMemory += this.bsize;
|
460 | if (this.mm.storageFull()) {
|
461 | console.log('# Out of memory, evict partition', this.partitionIndex);
|
462 | this.skip = true;
|
463 | this.mm.storageMemory -= this.tsize;
|
464 | this.data = [];
|
465 | this.mm.unregister(this);
|
466 | return data;
|
467 | }
|
468 | }
|
469 |
|
470 |
|
471 | for (var i = 0; i < data.length; i++) this.data.push(data[i]);
|
472 | return data;
|
473 | };
|
474 |
|
475 | Partition.prototype.iterate = function (task, p, pipeline, done) {
|
476 | var buffer;
|
477 |
|
478 | for (var i = 0; i < this.data.length; i++) {
|
479 | buffer = [this.data[i]];
|
480 | for (var t = 0; t < pipeline.length; t++)
|
481 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
482 | }
|
483 | done();
|
484 | };
|
485 |
|
486 | function Source(sc, N, getItem, args, npart) {
|
487 | Dataset.call(this, sc);
|
488 | this.getItem = getItem;
|
489 | this.npart = npart;
|
490 | this.N = N;
|
491 | this.args = args;
|
492 | this.type = 'Source';
|
493 | }
|
494 | util.inherits(Source, Dataset);
|
495 |
|
496 | Source.prototype.iterate = function (task, p, pipeline, done) {
|
497 | var buffer, i, index = this.bases[p], n = this.sizes[p];
|
498 |
|
499 | for (i = 0; i < n; i++, index++) {
|
500 | buffer = [this.getItem(index, this.args, task)];
|
501 | for (var t = 0; t < pipeline.length; t++)
|
502 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
503 | }
|
504 | done();
|
505 | };
|
506 |
|
507 | Source.prototype.getPartitions = function (done) {
|
508 | var P = this.npart || this.sc.worker.length;
|
509 | var N = this.N;
|
510 | var plen = Math.ceil(N / P);
|
511 | var i, max;
|
512 | this.partitions = {};
|
513 | this.sizes = {};
|
514 | this.bases = {};
|
515 | this.nPartitions = P;
|
516 | for (i = 0, max = plen; i < P; i++, max += plen) {
|
517 | this.partitions[i] = new Partition(this.id, i);
|
518 | this.sizes[i] = (max <= N) ? plen : (max - N < plen) ? N - (plen * i) : 0;
|
519 | this.bases[i] = i ? this.bases[i - 1] + this.sizes[i - 1] : 0;
|
520 | }
|
521 | done();
|
522 | };
|
523 |
|
524 | function parallelize(sc, localArray, P) {
|
525 | if (!(localArray instanceof Array))
|
526 | throw new Error('First argument of parallelize() is not an instance of Array.');
|
527 |
|
528 | return new Source(sc, localArray.length, function (i, a) {return a[i];}, localArray, P);
|
529 | }
|
530 |
|
531 | function range(sc, start, end, step, P) {
|
532 | if (end === undefined) { end = start; start = 0; }
|
533 | if (step === undefined) step = 1;
|
534 |
|
535 | return new Source(sc, Math.ceil((end - start) / step), function (i, a) {
|
536 | return i * a.step + a.start;
|
537 | }, {step: step, start: start}, P);
|
538 | }
|
539 |
|
540 | function Obj2line() {
|
541 | stream.Transform.call(this, {objectMode: true});
|
542 | }
|
543 | util.inherits(Obj2line, stream.Transform);
|
544 |
|
545 | Obj2line.prototype._transform = function (chunk, encoding, done) {
|
546 | done(null, JSON.stringify(chunk) + '\n');
|
547 | };
|
548 |
|
549 | function Stream(sc, stream, type) {
|
550 | var id = uuid.v4();
|
551 | var tmpFile = sc.basedir + 'tmp/' + id;
|
552 | var targetFile = sc.basedir + 'stream/' + id;
|
553 | var out = fs.createWriteStream(tmpFile);
|
554 | var dataset = sc.textFile(targetFile);
|
555 |
|
556 | dataset.watched = true;
|
557 | dataset.parse = type == 'object';
|
558 | out.on('close', function () {
|
559 | fs.renameSync(tmpFile, targetFile);
|
560 | dataset.watched = false;
|
561 | });
|
562 | if (type == 'object')
|
563 | stream.pipe(new Obj2line()).pipe(out);
|
564 | else
|
565 | stream.pipe(out);
|
566 | return dataset;
|
567 | }
|
568 |
|
569 | function GzipFile(sc, file) {
|
570 | Dataset.call(this, sc);
|
571 | this.file = file;
|
572 | this.type = 'GzipFile';
|
573 | }
|
574 |
|
575 | util.inherits(GzipFile, Dataset);
|
576 |
|
577 | GzipFile.prototype.getPartitions = function (done) {
|
578 | this.partitions = {0: new Partition(this.id, 0)};
|
579 | this.nPartitions = 1;
|
580 | done();
|
581 | };
|
582 |
|
583 | GzipFile.prototype.iterate = function (task, p, pipeline, done) {
|
584 | var rs = task.lib.fs.createReadStream(this.file).pipe(task.lib.zlib.createGunzip({chunkSize: 65536}));
|
585 | var tail = '';
|
586 |
|
587 | rs.on('data', function (chunk) {
|
588 | var str = tail + chunk;
|
589 | var lines = str.split(/\r\n|\r|\n/);
|
590 | var buffer;
|
591 | tail = lines.pop();
|
592 | for (var i = 0; i < lines.length; i++) {
|
593 | buffer = [lines[i]];
|
594 | for (var t = 0; t < pipeline.length; t++)
|
595 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
596 | }
|
597 | });
|
598 |
|
599 | rs.on('end', function () {
|
600 | if (tail) {
|
601 | var buffer = [tail];
|
602 | for (var t = 0; t < pipeline.length; t++)
|
603 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
604 | }
|
605 | done();
|
606 | });
|
607 | };
|
608 |
|
609 | function TextS3File(sc, file) {
|
610 | Dataset.call(this, sc);
|
611 | var _a = file.split('/');
|
612 | this.bucket = _a[0];
|
613 | this.path = _a.slice(1).join('/');
|
614 | this.type = 'TextS3File';
|
615 | }
|
616 |
|
617 | util.inherits(TextS3File, Dataset);
|
618 |
|
619 | TextS3File.prototype.getPartitions = function (done) {
|
620 | this.partitions = {0: new Partition(this.id, 0)};
|
621 | this.nPartitions = 1;
|
622 | done();
|
623 | };
|
624 |
|
625 | TextS3File.prototype.iterate = function (task, p, pipeline, done) {
|
626 | var s3 = new task.lib.AWS.S3({signatureVersion: 'v4'});
|
627 | var rs = s3.getObject({Bucket: this.bucket, Key: this.path}).createReadStream();
|
628 | var tail = '';
|
629 |
|
630 | if (this.path.slice(-3) === '.gz')
|
631 | rs = rs.pipe(task.lib.zlib.createGunzip({chunkSize: 65536}));
|
632 |
|
633 | rs.on('data', function (chunk) {
|
634 | var str = tail + chunk;
|
635 | var lines = str.split(/\r\n|\r|\n/);
|
636 | var buffer;
|
637 | tail = lines.pop();
|
638 | for (var i = 0; i < lines.length; i++) {
|
639 | buffer = [lines[i]];
|
640 | for (var t = 0; t < pipeline.length; t++)
|
641 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
642 | }
|
643 | });
|
644 |
|
645 | rs.on('end', function() {
|
646 | if (tail) {
|
647 | var buffer = [tail];
|
648 | for (var t = 0; t < pipeline.length; t++)
|
649 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
650 | }
|
651 | done();
|
652 | });
|
653 | };
|
654 |
|
655 | function TextS3Dir(sc, dir, options) {
|
656 | Dataset.call(this, sc);
|
657 | var _a = dir.split('/');
|
658 | this.bucket = _a[0];
|
659 | this.prefix = _a.slice(1).join('/');
|
660 | this.type = 'TextS3Dir';
|
661 | this.options = options || {};
|
662 | }
|
663 |
|
664 | util.inherits(TextS3Dir, Dataset);
|
665 |
|
666 | TextS3Dir.prototype.getPartitions = function (done) {
|
667 | var self = this;
|
668 | var s3 = new AWS.S3({signatureVersion: 'v4'});
|
669 |
|
670 | function getList(list, token, done) {
|
671 | s3.listObjectsV2({
|
672 | Bucket: self.bucket,
|
673 | Prefix: self.prefix,
|
674 | ContinuationToken: token
|
675 | }, function (err, data) {
|
676 | if (err) throw new Error('s3.listObjectsV2 failed');
|
677 | list = list.concat(data.Contents);
|
678 | if (data.IsTruncated)
|
679 | return getList(list, data.NextContinuationToken, done);
|
680 | done(err, list);
|
681 | });
|
682 | }
|
683 |
|
684 | getList([], null, function (err, res) {
|
685 | if (err) return done(err);
|
686 | res = res.slice(1);
|
687 | self.partitions = {};
|
688 | if (self.options.maxFiles && self.options.maxFiles < res.length)
|
689 | self.nPartitions = self.options.maxFiles;
|
690 | else
|
691 | self.nPartitions = res.length;
|
692 | for (var i = 0; i < res.length; i++) {
|
693 | self.partitions[i] = new Partition(self.id, i);
|
694 | self.partitions[i].path = res[i].Key;
|
695 | }
|
696 | done();
|
697 | });
|
698 | };
|
699 |
|
700 | TextS3Dir.prototype.iterate = function (task, p, pipeline, done) {
|
701 | var path = this.partitions[p].path;
|
702 | var tail = '';
|
703 | var s3 = new task.lib.AWS.S3({signatureVersion: 'v4'});
|
704 | var rs = s3.getObject({Bucket: this.bucket, Key: path}).createReadStream();
|
705 | if (path.slice(-3) === '.gz') rs = rs.pipe(task.lib.zlib.createGunzip({chunkSize: 65536}));
|
706 |
|
707 | rs.on('data', function (chunk) {
|
708 | var str = tail + chunk;
|
709 | var lines = str.split(/\r\n|\r|\n/);
|
710 | var buffer;
|
711 | tail = lines.pop();
|
712 | for (var i = 0; i < lines.length; i++) {
|
713 | buffer = [lines[i]];
|
714 | for (var t = 0; t < pipeline.length; t++)
|
715 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
716 | }
|
717 | });
|
718 |
|
719 | rs.on('end', function () {
|
720 | if (tail) {
|
721 | var buffer = [tail];
|
722 | for (var t = 0; t < pipeline.length; t++)
|
723 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
724 | }
|
725 | done();
|
726 | });
|
727 | };
|
728 |
|
729 | function TextDir(sc, dir, options) {
|
730 | Dataset.call(this, sc);
|
731 | this.type = 'TextDir';
|
732 | this.dir = dir;
|
733 | this.options = options || {};
|
734 | }
|
735 |
|
736 | util.inherits(TextDir, Dataset);
|
737 |
|
738 | TextDir.prototype.getPartitions = function (done) {
|
739 | var self = this;
|
740 | fs.readdir(this.dir, function (err, res) {
|
741 | if (err) return done(err);
|
742 | self.partitions = {};
|
743 | if (self.options.maxFiles && self.options.maxFiles < res.length)
|
744 | self.nPartitions = self.options.maxFiles;
|
745 | else
|
746 | self.nPartitions = res.length;
|
747 | for (var i = 0; i < res.length; i++) {
|
748 | self.partitions[i] = new Partition(self.id, i);
|
749 | self.partitions[i].path = res[i];
|
750 | }
|
751 | done();
|
752 | });
|
753 | };
|
754 |
|
755 | TextDir.prototype.iterate = function (task, p, pipeline, done) {
|
756 | var path = this.dir + this.partitions[p].path;
|
757 | var tail = '';
|
758 | var rs = task.lib.fs.createReadStream(path);
|
759 | if (path.slice(-3) === '.gz') rs = rs.pipe(task.lib.zlib.createGunzip({chunkSize: 65536}));
|
760 |
|
761 | rs.on('data', function (chunk) {
|
762 | var str = tail + chunk;
|
763 | var lines = str.split(/\r\n|\r|\n/);
|
764 | var buffer;
|
765 | tail = lines.pop();
|
766 | for (var i = 0; i < lines.length; i++) {
|
767 | buffer = [lines[i]];
|
768 | for (var t = 0; t < pipeline.length; t++)
|
769 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
770 | }
|
771 | });
|
772 |
|
773 | rs.on('end', function () {
|
774 | if (tail) {
|
775 | var buffer = [tail];
|
776 | for (var t = 0; t < pipeline.length; t++)
|
777 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
778 | }
|
779 | done();
|
780 | });
|
781 | };
|
782 |
|
783 | function TextFile(sc, file, nPartitions) {
|
784 | Dataset.call(this, sc);
|
785 | this.file = file;
|
786 | this.type = 'TextFile';
|
787 | this.nSplit = nPartitions || sc.worker.length;
|
788 | this.basedir = sc.basedir;
|
789 | }
|
790 |
|
791 | util.inherits(TextFile, Dataset);
|
792 |
|
793 | TextFile.prototype.getPartitions = function (done) {
|
794 | var self = this;
|
795 |
|
796 | function getSplits() {
|
797 | var u = url.parse(self.file);
|
798 |
|
799 | if ((u.protocol == 'hdfs:') && u.slashes && u.hostname && u.port)
|
800 | splitHDFSFile(u.path, self.nSplit, mapLogicalSplit);
|
801 | else
|
802 | splitLocalFile(u.path, self.nSplit, mapLogicalSplit);
|
803 |
|
804 | function mapLogicalSplit(split) {
|
805 | self.splits = split;
|
806 | self.partitions = {};
|
807 | self.nPartitions = self.splits.length;
|
808 | for (var i = 0; i < self.splits.length; i++)
|
809 | self.partitions[i] = new Partition(self.id, i);
|
810 | done();
|
811 | }
|
812 | }
|
813 |
|
814 | if (this.watched) {
|
815 | var watcher = fs.watch(self.basedir + 'stream', function (event, filename) {
|
816 | if ((event == 'rename') && (filename == path.basename(self.file))) {
|
817 | watcher.close();
|
818 | getSplits();
|
819 | }
|
820 | });
|
821 | } else getSplits();
|
822 | };
|
823 |
|
824 | TextFile.prototype.iterate = function (task, p, pipeline, done) {
|
825 | var buffer;
|
826 |
|
827 | function processLine(line) {
|
828 | if (!line) return;
|
829 | buffer = [line];
|
830 | for (var t = 0; t < pipeline.length; t++)
|
831 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
832 | }
|
833 |
|
834 | function processLineParse(line) {
|
835 | if (!line) return;
|
836 | buffer = [JSON.parse(line)];
|
837 | for (var t = 0; t < pipeline.length; t++)
|
838 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
839 | }
|
840 |
|
841 | task.lib.readSplit(this.splits, this.splits[p].index, this.parse ? processLineParse : processLine, done, function (part, opt) {
|
842 | return task.getReadStreamSync(part, opt);
|
843 | });
|
844 | };
|
845 |
|
846 | TextFile.prototype.getPreferedLocation = function (pid) {return this.splits[pid].ip;};
|
847 |
|
848 | function Map(parent, mapper, args) {
|
849 | Dataset.call(this, parent.sc, [parent]);
|
850 | this.mapper = mapper;
|
851 | this.args = args;
|
852 | this.type = 'Map';
|
853 | }
|
854 |
|
855 | util.inherits(Map, Dataset);
|
856 |
|
857 | Map.prototype.transform = function map(context, data) {
|
858 | var tmp = [];
|
859 | for (var i = 0; i < data.length; i++)
|
860 | tmp[i] = this.mapper(data[i], this.args, this.global);
|
861 | return tmp;
|
862 | };
|
863 |
|
864 | function FlatMap(parent, mapper, args) {
|
865 | Dataset.call(this, parent.sc, [parent]);
|
866 | this.mapper = mapper;
|
867 | this.args = args;
|
868 | this.type = 'FlatMap';
|
869 | }
|
870 |
|
871 | util.inherits(FlatMap, Dataset);
|
872 |
|
873 | FlatMap.prototype.transform = function flatmap(context, data) {
|
874 | var tmp = [];
|
875 | for (var i = 0; i < data.length; i++)
|
876 | tmp = tmp.concat(this.mapper(data[i], this.args, this.global));
|
877 | return tmp;
|
878 | };
|
879 |
|
880 | function MapValues(parent, mapper, args) {
|
881 | Dataset.call(this, parent.sc, [parent]);
|
882 | this.mapper = mapper;
|
883 | this.args = args;
|
884 | this.type = 'MapValues';
|
885 | }
|
886 |
|
887 | util.inherits(MapValues, Dataset);
|
888 |
|
889 | MapValues.prototype.transform = function (context, data) {
|
890 | var tmp = [];
|
891 | for (var i = 0; i < data.length; i++)
|
892 | tmp[i] = [data[i][0], this.mapper(data[i][1], this.args, this.global)];
|
893 | return tmp;
|
894 | };
|
895 |
|
896 | function FlatMapValues(parent, mapper, args) {
|
897 | Dataset.call(this, parent.sc, [parent]);
|
898 | this.mapper = mapper;
|
899 | this.args = args;
|
900 | this.type = 'FlatMapValues';
|
901 | }
|
902 |
|
903 | util.inherits(FlatMapValues, Dataset);
|
904 |
|
905 | FlatMapValues.prototype.transform = function (context, data) {
|
906 | var tmp = [];
|
907 | for (var i = 0; i < data.length; i++) {
|
908 | var t0 = this.mapper(data[i][1], this.args, this.global);
|
909 | tmp = tmp.concat(t0.map(function (e) {return [data[i][0], e];}));
|
910 | }
|
911 | return tmp;
|
912 | };
|
913 |
|
914 | function Filter(parent, filter, args) {
|
915 | Dataset.call(this, parent.sc, [parent]);
|
916 | this._filter = filter;
|
917 | this.args = args;
|
918 | this.type = 'Filter';
|
919 | }
|
920 |
|
921 | util.inherits(Filter, Dataset);
|
922 |
|
923 | Filter.prototype.transform = function (context, data) {
|
924 | var tmp = [];
|
925 | for (var i = 0; i < data.length; i++)
|
926 | if (this._filter(data[i], this.args, this.global)) tmp.push(data[i]);
|
927 | return tmp;
|
928 | };
|
929 |
|
930 | function Random(seed) {
|
931 | seed = seed || 0;
|
932 | this.x = 123456789 + seed;
|
933 | this.y = 188675123;
|
934 |
|
935 |
|
936 |
|
937 | this.next = function () {
|
938 | var t = this.x, u = this.y;
|
939 | t ^= t << 8;
|
940 | this.x = u;
|
941 | return this.y = (u ^ (u >> 22)) ^ (t ^ (t >> 9));
|
942 | };
|
943 |
|
944 |
|
945 | this.nextDouble = function () {
|
946 | return this.next() / 4294967296.0;
|
947 | };
|
948 | }
|
949 |
|
950 | function Poisson(lambda, initSeed) {
|
951 | initSeed = initSeed || 1;
|
952 |
|
953 | var rng = new Random(initSeed);
|
954 |
|
955 | this.sample = function () {
|
956 | var L = Math.exp(-lambda), k = 0, p = 1;
|
957 | do {
|
958 | k++;
|
959 | p *= rng.nextDouble();
|
960 | } while (p > L);
|
961 | return k - 1;
|
962 | };
|
963 | }
|
964 |
|
965 | function Sample(parent, withReplacement, frac, seed) {
|
966 | Dataset.call(this, parent.sc, [parent]);
|
967 | this.withReplacement = withReplacement;
|
968 | this.frac = frac;
|
969 | this.rng = withReplacement ? new Poisson(frac, seed) : new Random(seed);
|
970 | this.type = 'Sample';
|
971 | }
|
972 |
|
973 | util.inherits(Sample, Dataset);
|
974 |
|
975 | Sample.prototype.transform = function (context, data) {
|
976 | var tmp = [], i, j;
|
977 | if (this.withReplacement) {
|
978 | for (i = 0; i < data.length; i++)
|
979 | for (j = 0; j < this.rng.sample(); j++) tmp.push(data[i]);
|
980 | } else {
|
981 | for (i = 0; i < data.length; i++)
|
982 | if (this.rng.nextDouble() < this.frac) tmp[i] = data[i];
|
983 | }
|
984 | return tmp;
|
985 | };
|
986 |
|
987 | function Union(sc, parents) {
|
988 | Dataset.call(this, sc, parents);
|
989 | this.type = 'Union';
|
990 | }
|
991 |
|
992 | util.inherits(Union, Dataset);
|
993 |
|
994 | Union.prototype.transform = function (context, data) {return data;};
|
995 |
|
996 | function AggregateByKey(sc, dependencies, reducer, combiner, init, args) {
|
997 | Dataset.call(this, sc, dependencies);
|
998 | this.combiner = combiner;
|
999 | this.reducer = reducer;
|
1000 | this.init = init;
|
1001 | this.args = args;
|
1002 | this.shuffling = true;
|
1003 | this.executed = false;
|
1004 | this.buffer = [];
|
1005 | this.type = 'AggregateByKey';
|
1006 | }
|
1007 |
|
1008 | util.inherits(AggregateByKey, Dataset);
|
1009 |
|
1010 | AggregateByKey.prototype.getPartitions = function (done) {
|
1011 | if (this.partitions == undefined) {
|
1012 | var P = 0, i;
|
1013 | this.partitions = {};
|
1014 | for (i = 0; i < this.dependencies.length; i++)
|
1015 | P = Math.max(P, this.dependencies[i].nPartitions);
|
1016 | for (i = 0; i < P; i++) this.partitions[i] = new Partition(this.id, i);
|
1017 | this.nPartitions = P;
|
1018 | this.partitioner = new HashPartitioner(P);
|
1019 | }
|
1020 | done();
|
1021 | };
|
1022 |
|
1023 | AggregateByKey.prototype.transform = function (context, data) {
|
1024 | for (var i = 0; i < data.length; i++) {
|
1025 | var key = data[i][0], value = data[i][1], str = JSON.stringify(key), pid = this.partitioner.getPartitionIndex(data[i][0]);
|
1026 | if (this.buffer[pid] == undefined) this.buffer[pid] = {};
|
1027 | if (this.buffer[pid][str] == undefined) this.buffer[pid][str] = JSON.parse(JSON.stringify(this.init));
|
1028 | this.buffer[pid][str] = this.reducer(this.buffer[pid][str], value, this.args, this.global);
|
1029 | }
|
1030 | };
|
1031 |
|
1032 | AggregateByKey.prototype.spillToDisk = function (task, done) {
|
1033 | var i, isLeft, str, hash, data, path;
|
1034 |
|
1035 | if (this.dependencies.length > 1) {
|
1036 | isLeft = (this.shufflePartitions[task.pid].parentDatasetId == this.dependencies[0].id);
|
1037 | for (i = 0; i < this.nPartitions; i++) {
|
1038 | str = '';
|
1039 | path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
|
1040 | for (hash in this.buffer[i]) {
|
1041 | data = isLeft ? [JSON.parse(hash), [this.buffer[i][hash], []]] : [JSON.parse(hash), [[], this.buffer[i][hash]]];
|
1042 | str += JSON.stringify(data) + '\n';
|
1043 | if (str.length >= 65536) {
|
1044 | task.lib.fs.appendFileSync(path, str);
|
1045 | str = '';
|
1046 | }
|
1047 | }
|
1048 | task.lib.fs.appendFileSync(path, str);
|
1049 | task.files[i] = {host: task.grid.hostname, path: path};
|
1050 | }
|
1051 | } else {
|
1052 | for (i = 0; i < this.nPartitions; i++) {
|
1053 | str = '';
|
1054 | path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
|
1055 | for (hash in this.buffer[i]) {
|
1056 | data = [JSON.parse(hash), this.buffer[i][hash]];
|
1057 | str += JSON.stringify(data) + '\n';
|
1058 | if (str.length >= 65536) {
|
1059 | task.lib.fs.appendFileSync(path, str);
|
1060 | str = '';
|
1061 | }
|
1062 | }
|
1063 | task.lib.fs.appendFileSync(path, str);
|
1064 | task.files[i] = {host: task.grid.hostname, path: path};
|
1065 | }
|
1066 | }
|
1067 | done();
|
1068 | };
|
1069 |
|
1070 | AggregateByKey.prototype.iterate = function (task, p, pipeline, done) {
|
1071 | var self = this, cbuffer = {}, cnt = 0, files = [];
|
1072 |
|
1073 | for (var i = 0; i < self.nShufflePartitions; i++)
|
1074 | files.push(self.shufflePartitions[i].files[p]);
|
1075 |
|
1076 | processShuffleFile(files[cnt], processDone);
|
1077 |
|
1078 | function processShuffleFile(file, done) {
|
1079 |
|
1080 | var lines = new task.lib.Lines();
|
1081 | task.getReadStream(file, undefined, function (err, stream) {
|
1082 | stream.pipe(lines);
|
1083 | });
|
1084 | lines.on('data', function (linev) {
|
1085 | for (var i = 0; i < linev.length; i++) {
|
1086 | var data = JSON.parse(linev[i]), key = data[0], value = data[1], hash = JSON.stringify(key);
|
1087 | if (cbuffer[hash] != undefined) cbuffer[hash] = self.combiner(cbuffer[hash], value, self.args, self.global);
|
1088 | else cbuffer[hash] = value;
|
1089 | }
|
1090 | });
|
1091 | lines.on('end', done);
|
1092 | }
|
1093 |
|
1094 | function processDone() {
|
1095 | if (++cnt == files.length) {
|
1096 | for (var key in cbuffer) {
|
1097 | var buffer = [[JSON.parse(key), cbuffer[key]]];
|
1098 | for (var t = 0; t < pipeline.length; t++)
|
1099 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
1100 | }
|
1101 | done();
|
1102 | } else processShuffleFile(files[cnt], processDone);
|
1103 | }
|
1104 | };
|
1105 |
|
1106 | function Cartesian(sc, dependencies) {
|
1107 | Dataset.call(this, sc, dependencies);
|
1108 | this.shuffling = true;
|
1109 | this.executed = false;
|
1110 | this.buffer = [];
|
1111 | this.type = 'Cartesian';
|
1112 | }
|
1113 |
|
1114 | util.inherits(Cartesian, Dataset);
|
1115 |
|
1116 | Cartesian.prototype.getPartitions = function (done) {
|
1117 | if (this.partitions == undefined) {
|
1118 | this.pleft = this.dependencies[0].nPartitions;
|
1119 | this.pright = this.dependencies[1].nPartitions;
|
1120 | var P = this.pleft * this.pright;
|
1121 | this.partitions = {};
|
1122 | this.nPartitions = P;
|
1123 | for (var i = 0; i < P; i++)
|
1124 | this.partitions[i] = new Partition(this.id, i);
|
1125 | }
|
1126 | done();
|
1127 | };
|
1128 |
|
1129 | Cartesian.prototype.transform = function (context, data) {
|
1130 | for (var i = 0; i < data.length; i++) this.buffer.push(data[i]);
|
1131 | };
|
1132 |
|
1133 | Cartesian.prototype.spillToDisk = function (task, done) {
|
1134 | var str = '', path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
|
1135 | for (var i = 0; i < this.buffer.length; i++) {
|
1136 | str += JSON.stringify(this.buffer[i]) + '\n';
|
1137 | if (str.length >= 65536) {
|
1138 | task.lib.fs.appendFileSync(path, str);
|
1139 | str = '';
|
1140 | }
|
1141 | }
|
1142 | task.lib.fs.appendFileSync(path, str);
|
1143 | task.files = {host: task.grid.hostname, path: path};
|
1144 | task.log(task.files);
|
1145 | done();
|
1146 | };
|
1147 |
|
1148 | Cartesian.prototype.iterate = function (task, p, pipeline, done) {
|
1149 | var p1 = Math.floor(p / this.pright);
|
1150 | var p2 = p % this.pright + this.pleft;
|
1151 | var self = this;
|
1152 | var s1 = '';
|
1153 |
|
1154 | task.getReadStream(this.shufflePartitions[p1].files, undefined, function (err, stream1) {
|
1155 | stream1.on('data', function (s) {s1 += s;});
|
1156 | stream1.on('end', function () {
|
1157 | var a1 = s1.split('\n');
|
1158 | var s2 = '';
|
1159 | task.getReadStream(self.shufflePartitions[p2].files, undefined, function (err, stream2) {
|
1160 | stream2.on('data', function (s) {s2 += s;});
|
1161 | stream2.on('end', function () {
|
1162 | var a2 = s2.split('\n');
|
1163 | for (var i = 0; i < a1.length; i++) {
|
1164 | if (a1[i] == '') continue;
|
1165 | for (var j = 0; j < a2.length; j++) {
|
1166 | if (a2[j] == '') continue;
|
1167 | var buffer = [[JSON.parse(a1[i]), JSON.parse(a2[j])]];
|
1168 | for (var t = 0; t < pipeline.length; t++)
|
1169 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
1170 | }
|
1171 | }
|
1172 | done();
|
1173 | });
|
1174 | });
|
1175 | });
|
1176 | });
|
1177 | };
|
1178 |
|
1179 | function SortBy(sc, dependencies, keyFunc, ascending, numPartitions) {
|
1180 | Dataset.call(this, sc, [dependencies]);
|
1181 | this.shuffling = true;
|
1182 | this.executed = false;
|
1183 | this.keyFunc = keyFunc;
|
1184 | this.ascending = (ascending == undefined) ? true : ascending;
|
1185 | this.buffer = [];
|
1186 | this.numPartitions = numPartitions;
|
1187 | this.type = 'SortBy';
|
1188 | }
|
1189 |
|
1190 | util.inherits(SortBy, Dataset);
|
1191 |
|
1192 | SortBy.prototype.getPartitions = function (done) {
|
1193 | if (this.partitions == undefined) {
|
1194 | var P = Math.max(this.numPartitions || 1, this.dependencies[0].nPartitions);
|
1195 |
|
1196 | this.partitions = {};
|
1197 | this.nPartitions = P;
|
1198 | for (var p = 0; p < P; p++) this.partitions[p] = new Partition(this.id, p);
|
1199 | this.partitioner = new RangePartitioner(P, this.keyFunc, this.dependencies[0]);
|
1200 | this.partitioner.init(done);
|
1201 | } else done();
|
1202 | };
|
1203 |
|
1204 | SortBy.prototype.transform = function (context, data) {
|
1205 | for (var i = 0; i < data.length; i++) {
|
1206 | var pid = this.partitioner.getPartitionIndex(this.keyFunc(data[i]));
|
1207 | if (this.buffer[pid] == undefined) this.buffer[pid] = [];
|
1208 | this.buffer[pid].push(data[i]);
|
1209 | }
|
1210 | };
|
1211 |
|
1212 | SortBy.prototype.spillToDisk = function (task, done) {
|
1213 | for (var i = 0; i < this.nPartitions; i++) {
|
1214 | var str = '', path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
|
1215 | if (this.buffer[i] != undefined) {
|
1216 | for (var j = 0; j < this.buffer[i].length; j++) {
|
1217 | str += JSON.stringify(this.buffer[i][j]) + '\n';
|
1218 | if (str.length >= 65536) {
|
1219 | task.lib.fs.appendFileSync(path, str);
|
1220 | str = '';
|
1221 | }
|
1222 | }
|
1223 | }
|
1224 | task.lib.fs.appendFileSync(path, str);
|
1225 | task.files[i] = {host: task.grid.hostname, path: path};
|
1226 | }
|
1227 | done();
|
1228 | };
|
1229 |
|
1230 | SortBy.prototype.iterate = function (task, p, pipeline, done) {
|
1231 | var self = this, cbuffer = [], cnt = 0, files = [];
|
1232 |
|
1233 | for (var i = 0; i < self.nShufflePartitions; i++)
|
1234 | files.push(self.shufflePartitions[i].files[p]);
|
1235 |
|
1236 | processShuffleFile(files[cnt], processDone);
|
1237 |
|
1238 | function processShuffleFile(file, done) {
|
1239 | var lines = new task.lib.Lines();
|
1240 | task.getReadStream(file, undefined, function (err, stream) {
|
1241 | stream.pipe(lines);
|
1242 | });
|
1243 | lines.on('data', function (linev) {
|
1244 | for (var i = 0; i < linev.length; i++)
|
1245 | cbuffer.push(JSON.parse(linev[i]));
|
1246 | });
|
1247 | lines.on('end', done);
|
1248 | }
|
1249 |
|
1250 | function processDone() {
|
1251 | if (++cnt == files.length) {
|
1252 | cbuffer.sort(compare);
|
1253 | for (var i = 0; i < cbuffer.length; i++) {
|
1254 | var buffer = [cbuffer[i]];
|
1255 | for (var t = 0; t < pipeline.length; t++)
|
1256 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
1257 | }
|
1258 | done();
|
1259 | } else processShuffleFile(files[cnt], processDone);
|
1260 |
|
1261 | function compare(a, b) {
|
1262 | if (self.keyFunc(a) < self.keyFunc(b)) return self.ascending ? -1 : 1;
|
1263 | if (self.keyFunc(a) > self.keyFunc(b)) return self.ascending ? 1 : -1;
|
1264 | return 0;
|
1265 | }
|
1266 | }
|
1267 | };
|
1268 |
|
1269 | function PartitionBy(sc, dependencies, partitioner) {
|
1270 | Dataset.call(this, sc, [dependencies]);
|
1271 | this.shuffling = true;
|
1272 | this.executed = false;
|
1273 | this.buffer = [];
|
1274 | this.partitioner = partitioner;
|
1275 | this.type = 'PartitionBy';
|
1276 | }
|
1277 |
|
1278 | util.inherits(PartitionBy, Dataset);
|
1279 |
|
1280 | PartitionBy.prototype.getPartitions = function (done) {
|
1281 | if (this.partitions == undefined) {
|
1282 | var P = this.partitioner.numPartitions;
|
1283 | this.partitions = {};
|
1284 | this.nPartitions = P;
|
1285 | for (var p = 0; p < P; p++) this.partitions[p] = new Partition(this.id, p);
|
1286 | if (this.partitioner.init) this.partitioner.init(done);
|
1287 | else done();
|
1288 | } else done();
|
1289 | };
|
1290 |
|
1291 | PartitionBy.prototype.transform = function (context, data) {
|
1292 | for (var i = 0; i < data.length; i++) {
|
1293 | var pid = this.partitioner.getPartitionIndex(data[i][0]);
|
1294 | if (this.buffer[pid] == undefined) this.buffer[pid] = [];
|
1295 | this.buffer[pid].push(data[i]);
|
1296 | }
|
1297 | };
|
1298 |
|
1299 | PartitionBy.prototype.spillToDisk = function (task, done) {
|
1300 | for (var i = 0; i < this.nPartitions; i++) {
|
1301 | var str = '', path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
|
1302 | if (this.buffer[i] != undefined) {
|
1303 | for (var j = 0; j < this.buffer[i].length; j++) {
|
1304 | str += JSON.stringify(this.buffer[i][j]) + '\n';
|
1305 | if (str.length >= 65536) {
|
1306 | task.lib.fs.appendFileSync(path, str);
|
1307 | str = '';
|
1308 | }
|
1309 | }
|
1310 | }
|
1311 | task.lib.fs.appendFileSync(path, str);
|
1312 | task.files[i] = {host: task.grid.hostname, path: path};
|
1313 | }
|
1314 | done();
|
1315 | };
|
1316 |
|
1317 | PartitionBy.prototype.iterate = function (task, p, pipeline, done) {
|
1318 | var self = this, cbuffer = [], cnt = 0, files = [];
|
1319 |
|
1320 | for (var i = 0; i < self.nShufflePartitions; i++)
|
1321 | files.push(self.shufflePartitions[i].files[p]);
|
1322 |
|
1323 | processShuffleFile(files[cnt], processDone);
|
1324 |
|
1325 | function processShuffleFile(file, done) {
|
1326 | var lines = new task.lib.Lines();
|
1327 | task.getReadStream(file, undefined, function (err, stream) {
|
1328 | stream.pipe(lines);
|
1329 | });
|
1330 | lines.on('data', function (linev) {
|
1331 | for (var i = 0; i < linev.length; i++)
|
1332 | cbuffer.push(JSON.parse(linev[i]));
|
1333 | });
|
1334 | lines.on('end', done);
|
1335 | }
|
1336 |
|
1337 | function processDone() {
|
1338 | if (++cnt == files.length) {
|
1339 | for (var i = 0; i < cbuffer.length; i++) {
|
1340 | var buffer = [cbuffer[i]];
|
1341 | for (var t = 0; t < pipeline.length; t++)
|
1342 | buffer = pipeline[t].transform(pipeline[t], buffer);
|
1343 | }
|
1344 | done();
|
1345 | } else processShuffleFile(files[cnt], processDone);
|
1346 | }
|
1347 | };
|
1348 |
|
1349 | function RangePartitioner(numPartitions, keyFunc, dataset) {
|
1350 | this.numPartitions = numPartitions;
|
1351 |
|
1352 | this.init = function (done) {
|
1353 | var self = this;
|
1354 | dataset.sample(false, 0.5).collect(function (err, result) {
|
1355 | function compare(a, b) {
|
1356 | if (keyFunc(a) < keyFunc(b)) return -1;
|
1357 | if (keyFunc(a) > keyFunc(b)) return 1;
|
1358 | return 0;
|
1359 | }
|
1360 | result.sort(compare);
|
1361 | self.upperbounds = [];
|
1362 | if (result.length <= numPartitions - 1) {
|
1363 | self.upperbounds = result;
|
1364 | } else {
|
1365 | var s = Math.floor(result.length / numPartitions);
|
1366 | for (var i = 0; i < numPartitions - 1; i++) self.upperbounds.push(result[s * (i + 1)]);
|
1367 | }
|
1368 | done();
|
1369 | });
|
1370 | };
|
1371 |
|
1372 | this.getPartitionIndex = function (data) {
|
1373 | for (var i = 0; i < this.upperbounds.length; i++)
|
1374 | if (data < this.upperbounds[i]) break;
|
1375 | return i;
|
1376 | };
|
1377 | }
|
1378 |
|
1379 | function HashPartitioner(numPartitions) {
|
1380 | this.numPartitions = numPartitions;
|
1381 | this.type = 'HashPartitioner';
|
1382 | }
|
1383 |
|
1384 | HashPartitioner.prototype.hash = function (o) {
|
1385 | var i, h = 0, s = o.toString(), len = s.length;
|
1386 | for (i = 0; i < len; i++) {
|
1387 | h = ((h << 5) - h) + s.charCodeAt(i);
|
1388 | h = h & h;
|
1389 | }
|
1390 | return Math.abs(h);
|
1391 | };
|
1392 |
|
1393 | HashPartitioner.prototype.getPartitionIndex = function (data) {
|
1394 | return this.hash(data) % this.numPartitions;
|
1395 | };
|
1396 |
|
1397 | module.exports = {
|
1398 | Dataset: Dataset,
|
1399 | Partition: Partition,
|
1400 | parallelize: parallelize,
|
1401 | range: range,
|
1402 | GzipFile: GzipFile,
|
1403 | TextFile: TextFile,
|
1404 | TextDir: TextDir,
|
1405 | TextS3Dir: TextS3Dir,
|
1406 | TextS3File: TextS3File,
|
1407 | Source: Source,
|
1408 | Stream: Stream,
|
1409 | Random: Random,
|
1410 | Map: Map,
|
1411 | FlatMap: FlatMap,
|
1412 | MapValues: MapValues,
|
1413 | FlatMapValues: FlatMapValues,
|
1414 | Filter: Filter,
|
1415 | Sample: Sample,
|
1416 | Union: Union,
|
1417 | AggregateByKey: AggregateByKey,
|
1418 | Cartesian: Cartesian,
|
1419 | SortBy: SortBy,
|
1420 | PartitionBy: PartitionBy,
|
1421 | RangePartitioner: RangePartitioner,
|
1422 | HashPartitioner: HashPartitioner
|
1423 | };
|