UNPKG

43.4 kBJavaScriptView Raw
1// Copyright 2016 Luca-SAS, licensed under the Apache License 2.0
2
3'use strict';
4
5var fs = require('fs');
6var path = require('path');
7var url = require('url');
8var util = require('util');
9var stream = require('stream');
10var thenify = require('thenify').withCallback;
11var uuid = require('uuid');
12var splitLocalFile = require('./readsplit.js').splitLocalFile;
13var splitHDFSFile = require('./readsplit.js').splitHDFSFile;
14var AWS = require('aws-sdk');
15var merge2 = require('merge2');
16
17function Dataset(sc, dependencies) {
18 this.id = sc.datasetIdCounter++;
19 this.dependencies = dependencies || [];
20 this.persistent = false;
21 this.sc = sc;
22}
23
24Dataset.prototype.persist = function () {this.persistent = true; return this;};
25
26Dataset.prototype.map = function (mapper, args) {return new Map(this, mapper, args);};
27
28Dataset.prototype.flatMap = function (mapper, args) {return new FlatMap(this, mapper, args);};
29
30Dataset.prototype.mapValues = function (mapper, args) {return new MapValues(this, mapper, args);};
31
32Dataset.prototype.flatMapValues = function (mapper, args) {return new FlatMapValues(this, mapper, args);};
33
34Dataset.prototype.filter = function (filter, args) {return new Filter(this, filter, args);};
35
36Dataset.prototype.sample = function (withReplacement, frac, seed) {return new Sample(this, withReplacement, frac, seed || 1);};
37
38Dataset.prototype.union = function (other) {return (other.id == this.id) ? this : new Union(this.sc, [this, other]);};
39
40Dataset.prototype.aggregateByKey = function (reducer, combiner, init, args) {
41 if (arguments.length < 3) throw new Error('Missing argument for function aggregateByKey().');
42 return new AggregateByKey(this.sc, [this], reducer, combiner, init, args);
43};
44
45Dataset.prototype.reduceByKey = function (reducer, init, args) {
46 if (arguments.length < 2) throw new Error('Missing argument for function reduceByKey().');
47 return new AggregateByKey(this.sc, [this], reducer, reducer, init, args);
48};
49
50Dataset.prototype.groupByKey = function () {
51 function reducer(a, b) {a.push(b); return a;}
52 function combiner(a, b) {return a.concat(b);}
53 return new AggregateByKey(this.sc, [this], reducer, combiner, [], {});
54};
55
56Dataset.prototype.coGroup = function (other) {
57 function reducer(a, b) {a.push(b); return a;}
58 function combiner(a, b) {
59 for (var i = 0; i < b.length; i++) a[i] = a[i].concat(b[i]);
60 return a;
61 }
62 return new AggregateByKey(this.sc, [this, other], reducer, combiner, [], {});
63};
64
65Dataset.prototype.cartesian = function (other) {return new Cartesian(this.sc, [this, other]);};
66
67Dataset.prototype.sortBy = function (sorter, ascending, numPartitions) {
68 return new SortBy(this.sc, this, sorter, ascending, numPartitions);
69};
70
71Dataset.prototype.partitionBy = function (partitioner) {
72 return new PartitionBy(this.sc, this, partitioner);
73};
74
75Dataset.prototype.sortByKey = function (ascending, numPartitions) {
76 return new SortBy(this.sc, this, function (data) {return data[0];}, ascending, numPartitions);
77};
78
79Dataset.prototype.join = function (other) {
80 return this.coGroup(other).flatMapValues(function (v) {
81 var res = [];
82 for (var i in v[0])
83 for (var j in v[1])
84 res.push([v[0][i], v[1][j]]);
85 return res;
86 });
87};
88
89Dataset.prototype.leftOuterJoin = function (other) {
90 return this.coGroup(other).flatMapValues(function (v) {
91 var res = [], i, j;
92 if (v[1].length == 0) {
93 for (i in v[0]) res.push([v[0][i], null]);
94 } else {
95 for (i in v[0])
96 for (j in v[1]) res.push([v[0][i], v[1][j]]);
97 }
98 return res;
99 });
100};
101
102Dataset.prototype.rightOuterJoin = function (other) {
103 return this.coGroup(other).flatMapValues(function (v) {
104 var res = [], i, j;
105 if (v[0].length == 0) {
106 for (i in v[1]) res.push([null, v[1][i]]);
107 } else {
108 for (i in v[0])
109 for (j in v[1]) res.push([v[0][i], v[1][j]]);
110 }
111 return res;
112 });
113};
114
115Dataset.prototype.distinct = function () {
116 return this.map(function (e) {return [e, null];})
117 .reduceByKey(function (a) {return a;}, null)
118 .map(function (a) {return a[0];});
119};
120
121Dataset.prototype.intersection = function (other) {
122 function mapper(e) {return [e, 0];}
123 function reducer(a) {return a + 1;}
124 var a = this.map(mapper).reduceByKey(reducer, 0);
125 var b = other.map(mapper).reduceByKey(reducer, 0);
126 return a.coGroup(b).flatMap(function (a) {
127 return (a[1][0].length && a[1][1].length) ? [a[0]] : [];
128 });
129};
130
131Dataset.prototype.subtract = function (other) {
132 function mapper(e) {return [e, 0];}
133 function reducer(a) {return a + 1;}
134 var a = this.map(mapper).reduceByKey(reducer, 0);
135 var b = other.map(mapper).reduceByKey(reducer, 0);
136 return a.coGroup(b).flatMap(function (a) {
137 var res = [];
138 if (a[1][0].length && (a[1][1].length == 0))
139 for (var i = 0; i < a[1][0][0]; i++) res.push(a[0]);
140 return res;
141 });
142};
143
144Dataset.prototype.keys = function () {return this.map(function (a) {return a[0];});};
145
146Dataset.prototype.values = function () {return this.map(function (a) {return a[1];});};
147
148Dataset.prototype.lookup = thenify(function (key, done) {
149 return this.filter(function (kv, args) {return kv[0] === args.key;}, {key: key})
150 .map(function (kv) {return kv[1];}).collect(done);
151});
152
153Dataset.prototype.countByValue = thenify(function (done) {
154 return this.map(function (e) {return [e, 1];})
155 .reduceByKey(function (a, b) {return a + b;}, 0)
156 .collect(done);
157});
158
159Dataset.prototype.countByKey = thenify(function (done) {
160 return this.mapValues(function () {return 1;})
161 .reduceByKey(function (a, b) {return a + b;}, 0)
162 .collect(done);
163});
164
165Dataset.prototype.collect = thenify(function (done) {
166 var reducer = function (a, b) {a.push(b); return a;};
167 var combiner = function (a, b) {return a.concat(b);};
168 var init = [], action = {args: [], src: reducer, init: init, opt: {}}, self = this;
169
170 return this.sc.runJob({}, this, action, function (job, tasks) {
171 var result = JSON.parse(JSON.stringify(init)), cnt = 0;
172 function taskDone(err, res) {
173 result = combiner(result, res.data);
174 if (++cnt < tasks.length) return self.sc.runTask(tasks[cnt], taskDone);
175 self.sc.log('result stage done');
176 done(err, result);
177 }
178
179 self.sc.runTask(tasks[cnt], taskDone);
180 });
181});
182
183// The stream action allows the master to return a dataset as a stream
184// Each worker spills its partitions to disk
185// then master pipes each remote partition into output stream
186Dataset.prototype.stream = function (options) {
187 options = options || {};
188 var self = this;
189 var outStream = merge2();
190 var opt = {
191 gzip: options.gzip,
192 _preIterate: function (opt, wc, p) {
193 var suffix = opt.gzip ? '.gz' : '';
194 wc.exportFile = wc.basedir + 'export/' + p + suffix;
195 },
196 _postIterate: function (acc, opt, wc, p, done) {
197 var fs = wc.lib.fs;
198 var zlib = wc.lib.zlib;
199 if (opt.gzip) {
200 fs.appendFileSync(wc.exportFile, zlib.gzipSync(acc, {
201 chunckSize: 65536,
202 level: zlib.Z_BEST_SPEED
203 }));
204 } else {
205 fs.appendFileSync(wc.exportFile, acc);
206 }
207 done(wc.exportFile);
208 }
209 };
210 var pstreams = [];
211
212 function reducer(acc, val, opt, wc) {
213 acc = acc.concat(JSON.stringify(val) + '\n');
214 if (acc.length >= 65536) {
215 var fs = wc.lib.fs;
216 if (opt.gzip) {
217 var zlib = wc.lib.zlib;
218 fs.appendFileSync(wc.exportFile, zlib.gzipSync(acc, {
219 chunckSize: 65536,
220 level: zlib.Z_BEST_SPEED
221 }));
222 } else {
223 fs.appendFileSync(wc.exportFile, acc);
224 }
225 acc = '';
226 }
227 return acc;
228 }
229
230 function combiner(acc1, acc2) {
231 var p = acc2.path.match(/.+\/([0-9]+)/)[1];
232 pstreams[p] = self.sc.getReadStreamSync(acc2);
233 }
234
235 this.aggregate(reducer, combiner, '', opt, function () {
236 for (var i = 0; i < pstreams.length; i++)
237 outStream.add(pstreams[i]);
238 });
239
240 if (options.end) outStream.on('end', self.sc.end);
241 return outStream;
242};
243
244// In save action, each worker exports its dataset partitions to
245// a destination: a directory on the master, a remote S3, a database, etc.
246// The format is JSON, one per dataset entry (dataset = stream of JSON)
247//
248// Step 1: partition is spilled to disk (during pipelining)
249// Step 2: partition file is streamed from disk to destination (at end of pipeline)
250// This is necessary because all pipeline functions are synchronous
251// and to avoid back pressure during streaming out.
252//
253Dataset.prototype.save = thenify(function (path, options, done) {
254 options = options || {};
255 if (arguments.length < 3) done = options;
256 var opt = {
257 gzip: options.gzip,
258 path: path,
259 _preIterate: function (opt, wc, p) {
260 var suffix = opt.gzip ? '.gz' : '';
261 wc.exportFile = wc.basedir + 'export/' + p + suffix;
262 },
263 _postIterate: function (acc, opt, wc, p, done) {
264 var suffix = opt.gzip ? '.gz' : '';
265 var fs = wc.lib.fs;
266 var zlib = wc.lib.zlib;
267 var url, readStream, writeStream;
268 if (opt.gzip)
269 fs.appendFileSync(wc.exportFile, zlib.gzipSync(acc, {
270 chunckSize: 65536,
271 level: zlib.Z_BEST_SPEED
272 }));
273 else
274 fs.appendFileSync(wc.exportFile, acc);
275 readStream = fs.createReadStream(wc.exportFile);
276 url = wc.lib.url.parse(opt.path);
277 switch (url.protocol) {
278 case 's3:':
279 var s3 = new wc.lib.AWS.S3({
280 httpOptions: {timeout: 3600000},
281 signatureVersion: 'v4'
282 });
283 s3.upload({
284 Bucket: url.host,
285 Key: url.path.slice(1) + '/' + p + suffix,
286 Body: readStream
287 }, function (err, data) {
288 if (err) wc.log('S3 upload error', err);
289 wc.log('upload to s3', data);
290 done();
291 });
292 break;
293 case 'file:':
294 case null:
295 wc.lib.mkdirp.sync(opt.path);
296 writeStream = fs.createWriteStream(url.path + '/' + p + suffix);
297 readStream.pipe(writeStream);
298 writeStream.on('close', done);
299 break;
300 default:
301 wc.log('Error: unsupported protocol', url.protocol);
302 done();
303 }
304 }
305 };
306
307 function reducer(acc, val, opt, wc) {
308 acc = acc.concat(JSON.stringify(val) + '\n');
309 if (acc.length >= 65536) {
310 var fs = wc.lib.fs;
311 if (opt.gzip) {
312 var zlib = wc.lib.zlib;
313 fs.appendFileSync(wc.exportFile, zlib.gzipSync(acc, {
314 chunckSize: 65536,
315 level: zlib.Z_BEST_SPEED
316 }));
317 } else {
318 fs.appendFileSync(wc.exportFile, acc);
319 }
320 acc = '';
321 }
322 return acc;
323 }
324
325 return this.aggregate(reducer, function(){}, '', opt, done);
326});
327
328Dataset.prototype.first = thenify(function (done) {
329 return this.take(1, function (err, res) {
330 if (res) done(err, res[0]);
331 else done(err);
332 });
333});
334
335Dataset.prototype.take = thenify(function (N, done) {
336 var reducer = function (a, b) {a.push(b); return a;};
337 var combiner = function (a, b) {return a.concat(b);};
338 var init = [], action = {args: [], src: reducer, init: init, opt: {}}, self = this;
339
340 return this.sc.runJob({}, this, action, function (job, tasks) {
341 var result = JSON.parse(JSON.stringify(init)), cnt = 0;
342
343 function taskDone(err, res) {
344 result = combiner(result, res.data);
345 if (++cnt < tasks.length && result.length < N) self.sc.runTask(tasks[cnt], taskDone);
346 else done(err, result.slice(0, N));
347 }
348
349 self.sc.runTask(tasks[cnt], taskDone);
350 });
351});
352
353Dataset.prototype.top = thenify(function (N, done) {
354 var reducer = function (a, b) {a.push(b); return a;};
355 var combiner = function (a, b) {return b.concat(a);};
356 var init = [], action = {args: [], src: reducer, init: init, opt: {}}, self = this;
357
358 return this.sc.runJob({}, this, action, function (job, tasks) {
359 var result = JSON.parse(JSON.stringify(init)), cnt = tasks.length - 1;
360
361 function taskDone(err, res) {
362 result = combiner(result, res.data);
363 if (--cnt >= 0 || result.length < N) self.sc.runTask(tasks[cnt], taskDone);
364 else done(err, result.slice(-N));
365 }
366
367 self.sc.runTask(tasks[cnt], taskDone);
368 });
369});
370
371Dataset.prototype.aggregate = thenify(function (reducer, combiner, init, opt, done) {
372 opt = opt || {}; var action = {args: [], src: reducer, init: init,
373 opt: opt}, self = this;
374
375 if (arguments.length < 5) done = opt;
376
377 return this.sc.runJob(opt, this, action, function (job, tasks) {
378 var result = JSON.parse(JSON.stringify(init));
379 var nworker = self.sc.worker.length;
380 var index = 0;
381 var busy = 0;
382 var complete = 0;
383 var error;
384
385 function runNext() {
386 while (busy < nworker && index < tasks.length) {
387 busy++;
388 self.sc.runTask(tasks[index++], function (err, res, task) {
389 result = combiner(result, res.data, opt);
390 complete++;
391 busy--;
392 self.sc.log('part ' + task.pid + ' done. Complete: ' + complete + '/' + tasks.length);
393 if (complete === tasks.length) return done(error, result);
394 runNext();
395 });
396 }
397 }
398
399 runNext();
400 });
401});
402
403Dataset.prototype.reduce = thenify(function (reducer, init, opt, done) {
404 opt = opt || {};
405 if (arguments.length < 4) done = opt;
406 return this.aggregate(reducer, reducer, init, opt, done);
407});
408
409Dataset.prototype.count = thenify(function (done) {
410 return this.aggregate(function (a) {return a + 1;}, function (a, b) {return a + b;}, 0, done);
411});
412
413Dataset.prototype.forEach = thenify(function (eacher, opt, done) {
414 var arg = {opt: opt, _foreach: true};
415 if (arguments.length < 3) done = opt;
416 return this.aggregate(eacher, function () {return null;}, null, arg, done);
417});
418
419Dataset.prototype.getPartitions = function (done) {
420 if (this.partitions == undefined) {
421 this.partitions = {};
422 var cnt = 0;
423 for (var i = 0; i < this.dependencies.length; i++) {
424 for (var j = 0; j < this.dependencies[i].nPartitions; j++) {
425 this.partitions[cnt] = new Partition(this.id, cnt, this.dependencies[i].id, this.dependencies[i].partitions[j].partitionIndex);
426 cnt++;
427 }
428 }
429 this.nPartitions = cnt;
430 }
431 done();
432};
433
434Dataset.prototype.getPreferedLocation = function () {return [];};
435
436function Partition(datasetId, partitionIndex, parentDatasetId, parentPartitionIndex) {
437 this.data = [];
438 this.datasetId = datasetId;
439 this.partitionIndex = partitionIndex;
440 this.parentDatasetId = parentDatasetId;
441 this.parentPartitionIndex = parentPartitionIndex;
442 this.type = 'Partition';
443 //this.count = 0;
444 //this.bsize = 0; // TODO: mv in worker only. estimated size of memory increment per period
445 //this.tsize = 0; // TODO: mv in worker only. estimated total partition size
446 //this.skip = false; // TODO: mv in worker only. true when partition is evicted due to memory shortage
447}
448
449Partition.prototype.transform = function (context, data) {
450 if (this.skip) return data; // Passthrough if partition is evicted
451
452 // Periodically check/update available memory, and evict partition
453 // if necessary. In this case it will be recomputed if required by
454 // a future action.
455 if (this.count++ == 9999) {
456 this.count = 0;
457 if (this.bsize == 0) this.bsize = this.mm.sizeOf(this.data);
458 this.tsize += this.bsize;
459 this.mm.storageMemory += this.bsize;
460 if (this.mm.storageFull()) {
461 console.log('# Out of memory, evict partition', this.partitionIndex);
462 this.skip = true;
463 this.mm.storageMemory -= this.tsize;
464 this.data = [];
465 this.mm.unregister(this);
466 return data;
467 }
468 }
469
470 // Perform persistence of partition in memory here
471 for (var i = 0; i < data.length; i++) this.data.push(data[i]);
472 return data;
473};
474
475Partition.prototype.iterate = function (task, p, pipeline, done) {
476 var buffer;
477
478 for (var i = 0; i < this.data.length; i++) {
479 buffer = [this.data[i]];
480 for (var t = 0; t < pipeline.length; t++)
481 buffer = pipeline[t].transform(pipeline[t], buffer);
482 }
483 done();
484};
485
486function Source(sc, N, getItem, args, npart) {
487 Dataset.call(this, sc);
488 this.getItem = getItem;
489 this.npart = npart;
490 this.N = N;
491 this.args = args;
492 this.type = 'Source';
493}
494util.inherits(Source, Dataset);
495
496Source.prototype.iterate = function (task, p, pipeline, done) {
497 var buffer, i, index = this.bases[p], n = this.sizes[p];
498
499 for (i = 0; i < n; i++, index++) {
500 buffer = [this.getItem(index, this.args, task)];
501 for (var t = 0; t < pipeline.length; t++)
502 buffer = pipeline[t].transform(pipeline[t], buffer);
503 }
504 done();
505};
506
507Source.prototype.getPartitions = function (done) {
508 var P = this.npart || this.sc.worker.length;
509 var N = this.N;
510 var plen = Math.ceil(N / P);
511 var i, max;
512 this.partitions = {};
513 this.sizes = {};
514 this.bases = {};
515 this.nPartitions = P;
516 for (i = 0, max = plen; i < P; i++, max += plen) {
517 this.partitions[i] = new Partition(this.id, i);
518 this.sizes[i] = (max <= N) ? plen : (max - N < plen) ? N - (plen * i) : 0;
519 this.bases[i] = i ? this.bases[i - 1] + this.sizes[i - 1] : 0;
520 }
521 done();
522};
523
524function parallelize(sc, localArray, P) {
525 if (!(localArray instanceof Array))
526 throw new Error('First argument of parallelize() is not an instance of Array.');
527
528 return new Source(sc, localArray.length, function (i, a) {return a[i];}, localArray, P);
529}
530
531function range(sc, start, end, step, P) {
532 if (end === undefined) { end = start; start = 0; }
533 if (step === undefined) step = 1;
534
535 return new Source(sc, Math.ceil((end - start) / step), function (i, a) {
536 return i * a.step + a.start;
537 }, {step: step, start: start}, P);
538}
539
540function Obj2line() {
541 stream.Transform.call(this, {objectMode: true});
542}
543util.inherits(Obj2line, stream.Transform);
544
545Obj2line.prototype._transform = function (chunk, encoding, done) {
546 done(null, JSON.stringify(chunk) + '\n');
547};
548
549function Stream(sc, stream, type) { // type = 'line' ou 'object'
550 var id = uuid.v4();
551 var tmpFile = sc.basedir + 'tmp/' + id;
552 var targetFile = sc.basedir + 'stream/' + id;
553 var out = fs.createWriteStream(tmpFile);
554 var dataset = sc.textFile(targetFile);
555
556 dataset.watched = true; // notify skale to wait for file before launching
557 dataset.parse = type == 'object';
558 out.on('close', function () {
559 fs.renameSync(tmpFile, targetFile);
560 dataset.watched = false;
561 });
562 if (type == 'object')
563 stream.pipe(new Obj2line()).pipe(out);
564 else
565 stream.pipe(out);
566 return dataset;
567}
568
569function GzipFile(sc, file) {
570 Dataset.call(this, sc);
571 this.file = file;
572 this.type = 'GzipFile';
573}
574
575util.inherits(GzipFile, Dataset);
576
577GzipFile.prototype.getPartitions = function (done) {
578 this.partitions = {0: new Partition(this.id, 0)};
579 this.nPartitions = 1;
580 done();
581};
582
583GzipFile.prototype.iterate = function (task, p, pipeline, done) {
584 var rs = task.lib.fs.createReadStream(this.file).pipe(task.lib.zlib.createGunzip({chunkSize: 65536}));
585 var tail = '';
586
587 rs.on('data', function (chunk) {
588 var str = tail + chunk;
589 var lines = str.split(/\r\n|\r|\n/);
590 var buffer;
591 tail = lines.pop();
592 for (var i = 0; i < lines.length; i++) {
593 buffer = [lines[i]];
594 for (var t = 0; t < pipeline.length; t++)
595 buffer = pipeline[t].transform(pipeline[t], buffer);
596 }
597 });
598
599 rs.on('end', function () {
600 if (tail) {
601 var buffer = [tail];
602 for (var t = 0; t < pipeline.length; t++)
603 buffer = pipeline[t].transform(pipeline[t], buffer);
604 }
605 done();
606 });
607};
608
609function TextS3File(sc, file) {
610 Dataset.call(this, sc);
611 var _a = file.split('/');
612 this.bucket = _a[0];
613 this.path = _a.slice(1).join('/');
614 this.type = 'TextS3File';
615}
616
617util.inherits(TextS3File, Dataset);
618
619TextS3File.prototype.getPartitions = function (done) {
620 this.partitions = {0: new Partition(this.id, 0)};
621 this.nPartitions = 1;
622 done();
623};
624
625TextS3File.prototype.iterate = function (task, p, pipeline, done) {
626 var s3 = new task.lib.AWS.S3({signatureVersion: 'v4'});
627 var rs = s3.getObject({Bucket: this.bucket, Key: this.path}).createReadStream();
628 var tail = '';
629
630 if (this.path.slice(-3) === '.gz')
631 rs = rs.pipe(task.lib.zlib.createGunzip({chunkSize: 65536}));
632
633 rs.on('data', function (chunk) {
634 var str = tail + chunk;
635 var lines = str.split(/\r\n|\r|\n/);
636 var buffer;
637 tail = lines.pop();
638 for (var i = 0; i < lines.length; i++) {
639 buffer = [lines[i]];
640 for (var t = 0; t < pipeline.length; t++)
641 buffer = pipeline[t].transform(pipeline[t], buffer);
642 }
643 });
644
645 rs.on('end', function() {
646 if (tail) {
647 var buffer = [tail];
648 for (var t = 0; t < pipeline.length; t++)
649 buffer = pipeline[t].transform(pipeline[t], buffer);
650 }
651 done();
652 });
653};
654
655function TextS3Dir(sc, dir, options) {
656 Dataset.call(this, sc);
657 var _a = dir.split('/');
658 this.bucket = _a[0];
659 this.prefix = _a.slice(1).join('/');
660 this.type = 'TextS3Dir';
661 this.options = options || {};
662}
663
664util.inherits(TextS3Dir, Dataset);
665
666TextS3Dir.prototype.getPartitions = function (done) {
667 var self = this;
668 var s3 = new AWS.S3({signatureVersion: 'v4'});
669
670 function getList(list, token, done) {
671 s3.listObjectsV2({
672 Bucket: self.bucket,
673 Prefix: self.prefix,
674 ContinuationToken: token
675 }, function (err, data) {
676 if (err) throw new Error('s3.listObjectsV2 failed');
677 list = list.concat(data.Contents);
678 if (data.IsTruncated)
679 return getList(list, data.NextContinuationToken, done);
680 done(err, list);
681 });
682 }
683
684 getList([], null, function (err, res) {
685 if (err) return done(err);
686 res = res.slice(1); // Skip directory entry
687 self.partitions = {};
688 if (self.options.maxFiles && self.options.maxFiles < res.length)
689 self.nPartitions = self.options.maxFiles;
690 else
691 self.nPartitions = res.length;
692 for (var i = 0; i < res.length; i++) {
693 self.partitions[i] = new Partition(self.id, i);
694 self.partitions[i].path = res[i].Key;
695 }
696 done();
697 });
698};
699
700TextS3Dir.prototype.iterate = function (task, p, pipeline, done) {
701 var path = this.partitions[p].path;
702 var tail = '';
703 var s3 = new task.lib.AWS.S3({signatureVersion: 'v4'});
704 var rs = s3.getObject({Bucket: this.bucket, Key: path}).createReadStream();
705 if (path.slice(-3) === '.gz') rs = rs.pipe(task.lib.zlib.createGunzip({chunkSize: 65536}));
706
707 rs.on('data', function (chunk) {
708 var str = tail + chunk;
709 var lines = str.split(/\r\n|\r|\n/);
710 var buffer;
711 tail = lines.pop();
712 for (var i = 0; i < lines.length; i++) {
713 buffer = [lines[i]];
714 for (var t = 0; t < pipeline.length; t++)
715 buffer = pipeline[t].transform(pipeline[t], buffer);
716 }
717 });
718
719 rs.on('end', function () {
720 if (tail) {
721 var buffer = [tail];
722 for (var t = 0; t < pipeline.length; t++)
723 buffer = pipeline[t].transform(pipeline[t], buffer);
724 }
725 done();
726 });
727};
728
729function TextDir(sc, dir, options) {
730 Dataset.call(this, sc);
731 this.type = 'TextDir';
732 this.dir = dir;
733 this.options = options || {};
734}
735
736util.inherits(TextDir, Dataset);
737
738TextDir.prototype.getPartitions = function (done) {
739 var self = this;
740 fs.readdir(this.dir, function (err, res) {
741 if (err) return done(err);
742 self.partitions = {};
743 if (self.options.maxFiles && self.options.maxFiles < res.length)
744 self.nPartitions = self.options.maxFiles;
745 else
746 self.nPartitions = res.length;
747 for (var i = 0; i < res.length; i++) {
748 self.partitions[i] = new Partition(self.id, i);
749 self.partitions[i].path = res[i];
750 }
751 done();
752 });
753};
754
755TextDir.prototype.iterate = function (task, p, pipeline, done) {
756 var path = this.dir + this.partitions[p].path;
757 var tail = '';
758 var rs = task.lib.fs.createReadStream(path);
759 if (path.slice(-3) === '.gz') rs = rs.pipe(task.lib.zlib.createGunzip({chunkSize: 65536}));
760
761 rs.on('data', function (chunk) {
762 var str = tail + chunk;
763 var lines = str.split(/\r\n|\r|\n/);
764 var buffer;
765 tail = lines.pop();
766 for (var i = 0; i < lines.length; i++) {
767 buffer = [lines[i]];
768 for (var t = 0; t < pipeline.length; t++)
769 buffer = pipeline[t].transform(pipeline[t], buffer);
770 }
771 });
772
773 rs.on('end', function () {
774 if (tail) {
775 var buffer = [tail];
776 for (var t = 0; t < pipeline.length; t++)
777 buffer = pipeline[t].transform(pipeline[t], buffer);
778 }
779 done();
780 });
781};
782
783function TextFile(sc, file, nPartitions) {
784 Dataset.call(this, sc);
785 this.file = file;
786 this.type = 'TextFile';
787 this.nSplit = nPartitions || sc.worker.length;
788 this.basedir = sc.basedir;
789}
790
791util.inherits(TextFile, Dataset);
792
793TextFile.prototype.getPartitions = function (done) {
794 var self = this;
795
796 function getSplits() {
797 var u = url.parse(self.file);
798
799 if ((u.protocol == 'hdfs:') && u.slashes && u.hostname && u.port)
800 splitHDFSFile(u.path, self.nSplit, mapLogicalSplit);
801 else
802 splitLocalFile(u.path, self.nSplit, mapLogicalSplit);
803
804 function mapLogicalSplit(split) {
805 self.splits = split;
806 self.partitions = {};
807 self.nPartitions = self.splits.length;
808 for (var i = 0; i < self.splits.length; i++)
809 self.partitions[i] = new Partition(self.id, i);
810 done();
811 }
812 }
813
814 if (this.watched) {
815 var watcher = fs.watch(self.basedir + 'stream', function (event, filename) {
816 if ((event == 'rename') && (filename == path.basename(self.file))) {
817 watcher.close(); // stop watching directory
818 getSplits();
819 }
820 });
821 } else getSplits();
822};
823
824TextFile.prototype.iterate = function (task, p, pipeline, done) {
825 var buffer;
826
827 function processLine(line) {
828 if (!line) return; // skip empty lines
829 buffer = [line];
830 for (var t = 0; t < pipeline.length; t++)
831 buffer = pipeline[t].transform(pipeline[t], buffer);
832 }
833
834 function processLineParse(line) {
835 if (!line) return; // skip empty lines
836 buffer = [JSON.parse(line)];
837 for (var t = 0; t < pipeline.length; t++)
838 buffer = pipeline[t].transform(pipeline[t], buffer);
839 }
840
841 task.lib.readSplit(this.splits, this.splits[p].index, this.parse ? processLineParse : processLine, done, function (part, opt) {
842 return task.getReadStreamSync(part, opt);
843 });
844};
845
846TextFile.prototype.getPreferedLocation = function (pid) {return this.splits[pid].ip;};
847
848function Map(parent, mapper, args) {
849 Dataset.call(this, parent.sc, [parent]);
850 this.mapper = mapper;
851 this.args = args;
852 this.type = 'Map';
853}
854
855util.inherits(Map, Dataset);
856
857Map.prototype.transform = function map(context, data) {
858 var tmp = [];
859 for (var i = 0; i < data.length; i++)
860 tmp[i] = this.mapper(data[i], this.args, this.global);
861 return tmp;
862};
863
864function FlatMap(parent, mapper, args) {
865 Dataset.call(this, parent.sc, [parent]);
866 this.mapper = mapper;
867 this.args = args;
868 this.type = 'FlatMap';
869}
870
871util.inherits(FlatMap, Dataset);
872
873FlatMap.prototype.transform = function flatmap(context, data) {
874 var tmp = [];
875 for (var i = 0; i < data.length; i++)
876 tmp = tmp.concat(this.mapper(data[i], this.args, this.global));
877 return tmp;
878};
879
880function MapValues(parent, mapper, args) {
881 Dataset.call(this, parent.sc, [parent]);
882 this.mapper = mapper;
883 this.args = args;
884 this.type = 'MapValues';
885}
886
887util.inherits(MapValues, Dataset);
888
889MapValues.prototype.transform = function (context, data) {
890 var tmp = [];
891 for (var i = 0; i < data.length; i++)
892 tmp[i] = [data[i][0], this.mapper(data[i][1], this.args, this.global)];
893 return tmp;
894};
895
896function FlatMapValues(parent, mapper, args) {
897 Dataset.call(this, parent.sc, [parent]);
898 this.mapper = mapper;
899 this.args = args;
900 this.type = 'FlatMapValues';
901}
902
903util.inherits(FlatMapValues, Dataset);
904
905FlatMapValues.prototype.transform = function (context, data) {
906 var tmp = [];
907 for (var i = 0; i < data.length; i++) {
908 var t0 = this.mapper(data[i][1], this.args, this.global);
909 tmp = tmp.concat(t0.map(function (e) {return [data[i][0], e];}));
910 }
911 return tmp;
912};
913
914function Filter(parent, filter, args) {
915 Dataset.call(this, parent.sc, [parent]);
916 this._filter = filter;
917 this.args = args;
918 this.type = 'Filter';
919}
920
921util.inherits(Filter, Dataset);
922
923Filter.prototype.transform = function (context, data) {
924 var tmp = [];
925 for (var i = 0; i < data.length; i++)
926 if (this._filter(data[i], this.args, this.global)) tmp.push(data[i]);
927 return tmp;
928};
929
930function Random(seed) {
931 seed = seed || 0;
932 this.x = 123456789 + seed;
933 this.y = 188675123;
934
935 // xorshift RNG producing a sequence of 2 ** 64 - 1 32 bits integers
936 // See http://www.jstatsoft.org/v08/i14/paper by G. Marsaglia
937 this.next = function () {
938 var t = this.x, u = this.y;
939 t ^= t << 8;
940 this.x = u;
941 return this.y = (u ^ (u >> 22)) ^ (t ^ (t >> 9));
942 };
943
944 // Return a float in range [0, 1) like Math.Random()
945 this.nextDouble = function () {
946 return this.next() / 4294967296.0;
947 };
948}
949
950function Poisson(lambda, initSeed) {
951 initSeed = initSeed || 1;
952
953 var rng = new Random(initSeed);
954
955 this.sample = function () {
956 var L = Math.exp(-lambda), k = 0, p = 1;
957 do {
958 k++;
959 p *= rng.nextDouble();
960 } while (p > L);
961 return k - 1;
962 };
963}
964
965function Sample(parent, withReplacement, frac, seed) {
966 Dataset.call(this, parent.sc, [parent]);
967 this.withReplacement = withReplacement;
968 this.frac = frac;
969 this.rng = withReplacement ? new Poisson(frac, seed) : new Random(seed);
970 this.type = 'Sample';
971}
972
973util.inherits(Sample, Dataset);
974
975Sample.prototype.transform = function (context, data) {
976 var tmp = [], i, j;
977 if (this.withReplacement) {
978 for (i = 0; i < data.length; i++)
979 for (j = 0; j < this.rng.sample(); j++) tmp.push(data[i]);
980 } else {
981 for (i = 0; i < data.length; i++)
982 if (this.rng.nextDouble() < this.frac) tmp[i] = data[i];
983 }
984 return tmp;
985};
986
987function Union(sc, parents) {
988 Dataset.call(this, sc, parents);
989 this.type = 'Union';
990}
991
992util.inherits(Union, Dataset);
993
994Union.prototype.transform = function (context, data) {return data;};
995
996function AggregateByKey(sc, dependencies, reducer, combiner, init, args) {
997 Dataset.call(this, sc, dependencies);
998 this.combiner = combiner;
999 this.reducer = reducer;
1000 this.init = init;
1001 this.args = args;
1002 this.shuffling = true;
1003 this.executed = false;
1004 this.buffer = [];
1005 this.type = 'AggregateByKey';
1006}
1007
1008util.inherits(AggregateByKey, Dataset);
1009
1010AggregateByKey.prototype.getPartitions = function (done) {
1011 if (this.partitions == undefined) {
1012 var P = 0, i;
1013 this.partitions = {};
1014 for (i = 0; i < this.dependencies.length; i++)
1015 P = Math.max(P, this.dependencies[i].nPartitions);
1016 for (i = 0; i < P; i++) this.partitions[i] = new Partition(this.id, i);
1017 this.nPartitions = P;
1018 this.partitioner = new HashPartitioner(P);
1019 }
1020 done();
1021};
1022
1023AggregateByKey.prototype.transform = function (context, data) {
1024 for (var i = 0; i < data.length; i++) {
1025 var key = data[i][0], value = data[i][1], str = JSON.stringify(key), pid = this.partitioner.getPartitionIndex(data[i][0]);
1026 if (this.buffer[pid] == undefined) this.buffer[pid] = {};
1027 if (this.buffer[pid][str] == undefined) this.buffer[pid][str] = JSON.parse(JSON.stringify(this.init));
1028 this.buffer[pid][str] = this.reducer(this.buffer[pid][str], value, this.args, this.global);
1029 }
1030};
1031
1032AggregateByKey.prototype.spillToDisk = function (task, done) {
1033 var i, isLeft, str, hash, data, path;
1034
1035 if (this.dependencies.length > 1) { // COGROUP
1036 isLeft = (this.shufflePartitions[task.pid].parentDatasetId == this.dependencies[0].id);
1037 for (i = 0; i < this.nPartitions; i++) {
1038 str = '';
1039 path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
1040 for (hash in this.buffer[i]) {
1041 data = isLeft ? [JSON.parse(hash), [this.buffer[i][hash], []]] : [JSON.parse(hash), [[], this.buffer[i][hash]]];
1042 str += JSON.stringify(data) + '\n';
1043 if (str.length >= 65536) {
1044 task.lib.fs.appendFileSync(path, str);
1045 str = '';
1046 }
1047 }
1048 task.lib.fs.appendFileSync(path, str);
1049 task.files[i] = {host: task.grid.hostname, path: path};
1050 }
1051 } else { // AGGREGATE BY KEY
1052 for (i = 0; i < this.nPartitions; i++) {
1053 str = '';
1054 path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
1055 for (hash in this.buffer[i]) {
1056 data = [JSON.parse(hash), this.buffer[i][hash]];
1057 str += JSON.stringify(data) + '\n';
1058 if (str.length >= 65536) {
1059 task.lib.fs.appendFileSync(path, str);
1060 str = '';
1061 }
1062 }
1063 task.lib.fs.appendFileSync(path, str);
1064 task.files[i] = {host: task.grid.hostname, path: path};
1065 }
1066 }
1067 done();
1068};
1069
1070AggregateByKey.prototype.iterate = function (task, p, pipeline, done) {
1071 var self = this, cbuffer = {}, cnt = 0, files = [];
1072
1073 for (var i = 0; i < self.nShufflePartitions; i++)
1074 files.push(self.shufflePartitions[i].files[p]);
1075
1076 processShuffleFile(files[cnt], processDone);
1077
1078 function processShuffleFile(file, done) {
1079 //task.log('processShuffleFile', p, file.path);
1080 var lines = new task.lib.Lines();
1081 task.getReadStream(file, undefined, function (err, stream) {
1082 stream.pipe(lines);
1083 });
1084 lines.on('data', function (linev) {
1085 for (var i = 0; i < linev.length; i++) {
1086 var data = JSON.parse(linev[i]), key = data[0], value = data[1], hash = JSON.stringify(key);
1087 if (cbuffer[hash] != undefined) cbuffer[hash] = self.combiner(cbuffer[hash], value, self.args, self.global);
1088 else cbuffer[hash] = value;
1089 }
1090 });
1091 lines.on('end', done);
1092 }
1093
1094 function processDone() {
1095 if (++cnt == files.length) {
1096 for (var key in cbuffer) {
1097 var buffer = [[JSON.parse(key), cbuffer[key]]];
1098 for (var t = 0; t < pipeline.length; t++)
1099 buffer = pipeline[t].transform(pipeline[t], buffer);
1100 }
1101 done();
1102 } else processShuffleFile(files[cnt], processDone);
1103 }
1104};
1105
1106function Cartesian(sc, dependencies) {
1107 Dataset.call(this, sc, dependencies);
1108 this.shuffling = true;
1109 this.executed = false;
1110 this.buffer = [];
1111 this.type = 'Cartesian';
1112}
1113
1114util.inherits(Cartesian, Dataset);
1115
1116Cartesian.prototype.getPartitions = function (done) {
1117 if (this.partitions == undefined) {
1118 this.pleft = this.dependencies[0].nPartitions;
1119 this.pright = this.dependencies[1].nPartitions;
1120 var P = this.pleft * this.pright;
1121 this.partitions = {};
1122 this.nPartitions = P;
1123 for (var i = 0; i < P; i++)
1124 this.partitions[i] = new Partition(this.id, i);
1125 }
1126 done();
1127};
1128
1129Cartesian.prototype.transform = function (context, data) {
1130 for (var i = 0; i < data.length; i++) this.buffer.push(data[i]);
1131};
1132
1133Cartesian.prototype.spillToDisk = function (task, done) {
1134 var str = '', path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
1135 for (var i = 0; i < this.buffer.length; i++) {
1136 str += JSON.stringify(this.buffer[i]) + '\n';
1137 if (str.length >= 65536) {
1138 task.lib.fs.appendFileSync(path, str);
1139 str = '';
1140 }
1141 }
1142 task.lib.fs.appendFileSync(path, str);
1143 task.files = {host: task.grid.hostname, path: path};
1144 task.log(task.files);
1145 done();
1146};
1147
1148Cartesian.prototype.iterate = function (task, p, pipeline, done) {
1149 var p1 = Math.floor(p / this.pright);
1150 var p2 = p % this.pright + this.pleft;
1151 var self = this;
1152 var s1 = '';
1153
1154 task.getReadStream(this.shufflePartitions[p1].files, undefined, function (err, stream1) {
1155 stream1.on('data', function (s) {s1 += s;});
1156 stream1.on('end', function () {
1157 var a1 = s1.split('\n');
1158 var s2 = '';
1159 task.getReadStream(self.shufflePartitions[p2].files, undefined, function (err, stream2) {
1160 stream2.on('data', function (s) {s2 += s;});
1161 stream2.on('end', function () {
1162 var a2 = s2.split('\n');
1163 for (var i = 0; i < a1.length; i++) {
1164 if (a1[i] == '') continue;
1165 for (var j = 0; j < a2.length; j++) {
1166 if (a2[j] == '') continue;
1167 var buffer = [[JSON.parse(a1[i]), JSON.parse(a2[j])]];
1168 for (var t = 0; t < pipeline.length; t++)
1169 buffer = pipeline[t].transform(pipeline[t], buffer);
1170 }
1171 }
1172 done();
1173 });
1174 });
1175 });
1176 });
1177};
1178
1179function SortBy(sc, dependencies, keyFunc, ascending, numPartitions) {
1180 Dataset.call(this, sc, [dependencies]);
1181 this.shuffling = true;
1182 this.executed = false;
1183 this.keyFunc = keyFunc;
1184 this.ascending = (ascending == undefined) ? true : ascending;
1185 this.buffer = [];
1186 this.numPartitions = numPartitions;
1187 this.type = 'SortBy';
1188}
1189
1190util.inherits(SortBy, Dataset);
1191
1192SortBy.prototype.getPartitions = function (done) {
1193 if (this.partitions == undefined) {
1194 var P = Math.max(this.numPartitions || 1, this.dependencies[0].nPartitions);
1195
1196 this.partitions = {};
1197 this.nPartitions = P;
1198 for (var p = 0; p < P; p++) this.partitions[p] = new Partition(this.id, p);
1199 this.partitioner = new RangePartitioner(P, this.keyFunc, this.dependencies[0]);
1200 this.partitioner.init(done);
1201 } else done();
1202};
1203
1204SortBy.prototype.transform = function (context, data) {
1205 for (var i = 0; i < data.length; i++) {
1206 var pid = this.partitioner.getPartitionIndex(this.keyFunc(data[i]));
1207 if (this.buffer[pid] == undefined) this.buffer[pid] = [];
1208 this.buffer[pid].push(data[i]);
1209 }
1210};
1211
1212SortBy.prototype.spillToDisk = function (task, done) {
1213 for (var i = 0; i < this.nPartitions; i++) {
1214 var str = '', path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
1215 if (this.buffer[i] != undefined) {
1216 for (var j = 0; j < this.buffer[i].length; j++) {
1217 str += JSON.stringify(this.buffer[i][j]) + '\n';
1218 if (str.length >= 65536) {
1219 task.lib.fs.appendFileSync(path, str);
1220 str = '';
1221 }
1222 }
1223 }
1224 task.lib.fs.appendFileSync(path, str);
1225 task.files[i] = {host: task.grid.hostname, path: path};
1226 }
1227 done();
1228};
1229
1230SortBy.prototype.iterate = function (task, p, pipeline, done) {
1231 var self = this, cbuffer = [], cnt = 0, files = [];
1232
1233 for (var i = 0; i < self.nShufflePartitions; i++)
1234 files.push(self.shufflePartitions[i].files[p]);
1235
1236 processShuffleFile(files[cnt], processDone);
1237
1238 function processShuffleFile(file, done) {
1239 var lines = new task.lib.Lines();
1240 task.getReadStream(file, undefined, function (err, stream) {
1241 stream.pipe(lines);
1242 });
1243 lines.on('data', function (linev) {
1244 for (var i = 0; i < linev.length; i++)
1245 cbuffer.push(JSON.parse(linev[i]));
1246 });
1247 lines.on('end', done);
1248 }
1249
1250 function processDone() {
1251 if (++cnt == files.length) {
1252 cbuffer.sort(compare);
1253 for (var i = 0; i < cbuffer.length; i++) {
1254 var buffer = [cbuffer[i]];
1255 for (var t = 0; t < pipeline.length; t++)
1256 buffer = pipeline[t].transform(pipeline[t], buffer);
1257 }
1258 done();
1259 } else processShuffleFile(files[cnt], processDone);
1260
1261 function compare(a, b) {
1262 if (self.keyFunc(a) < self.keyFunc(b)) return self.ascending ? -1 : 1;
1263 if (self.keyFunc(a) > self.keyFunc(b)) return self.ascending ? 1 : -1;
1264 return 0;
1265 }
1266 }
1267};
1268
1269function PartitionBy(sc, dependencies, partitioner) {
1270 Dataset.call(this, sc, [dependencies]);
1271 this.shuffling = true;
1272 this.executed = false;
1273 this.buffer = [];
1274 this.partitioner = partitioner;
1275 this.type = 'PartitionBy';
1276}
1277
1278util.inherits(PartitionBy, Dataset);
1279
1280PartitionBy.prototype.getPartitions = function (done) {
1281 if (this.partitions == undefined) {
1282 var P = this.partitioner.numPartitions;
1283 this.partitions = {};
1284 this.nPartitions = P;
1285 for (var p = 0; p < P; p++) this.partitions[p] = new Partition(this.id, p);
1286 if (this.partitioner.init) this.partitioner.init(done);
1287 else done();
1288 } else done();
1289};
1290
1291PartitionBy.prototype.transform = function (context, data) {
1292 for (var i = 0; i < data.length; i++) {
1293 var pid = this.partitioner.getPartitionIndex(data[i][0]);
1294 if (this.buffer[pid] == undefined) this.buffer[pid] = [];
1295 this.buffer[pid].push(data[i]);
1296 }
1297};
1298
1299PartitionBy.prototype.spillToDisk = function (task, done) {
1300 for (var i = 0; i < this.nPartitions; i++) {
1301 var str = '', path = task.basedir + 'shuffle/' + task.lib.uuid.v4();
1302 if (this.buffer[i] != undefined) {
1303 for (var j = 0; j < this.buffer[i].length; j++) {
1304 str += JSON.stringify(this.buffer[i][j]) + '\n';
1305 if (str.length >= 65536) {
1306 task.lib.fs.appendFileSync(path, str);
1307 str = '';
1308 }
1309 }
1310 }
1311 task.lib.fs.appendFileSync(path, str);
1312 task.files[i] = {host: task.grid.hostname, path: path};
1313 }
1314 done();
1315};
1316
1317PartitionBy.prototype.iterate = function (task, p, pipeline, done) {
1318 var self = this, cbuffer = [], cnt = 0, files = [];
1319
1320 for (var i = 0; i < self.nShufflePartitions; i++)
1321 files.push(self.shufflePartitions[i].files[p]);
1322
1323 processShuffleFile(files[cnt], processDone);
1324
1325 function processShuffleFile(file, done) {
1326 var lines = new task.lib.Lines();
1327 task.getReadStream(file, undefined, function (err, stream) {
1328 stream.pipe(lines);
1329 });
1330 lines.on('data', function (linev) {
1331 for (var i = 0; i < linev.length; i++)
1332 cbuffer.push(JSON.parse(linev[i]));
1333 });
1334 lines.on('end', done);
1335 }
1336
1337 function processDone() {
1338 if (++cnt == files.length) {
1339 for (var i = 0; i < cbuffer.length; i++) {
1340 var buffer = [cbuffer[i]];
1341 for (var t = 0; t < pipeline.length; t++)
1342 buffer = pipeline[t].transform(pipeline[t], buffer);
1343 }
1344 done();
1345 } else processShuffleFile(files[cnt], processDone);
1346 }
1347};
1348
1349function RangePartitioner(numPartitions, keyFunc, dataset) {
1350 this.numPartitions = numPartitions;
1351
1352 this.init = function (done) {
1353 var self = this;
1354 dataset.sample(false, 0.5).collect(function (err, result) {
1355 function compare(a, b) {
1356 if (keyFunc(a) < keyFunc(b)) return -1;
1357 if (keyFunc(a) > keyFunc(b)) return 1;
1358 return 0;
1359 }
1360 result.sort(compare);
1361 self.upperbounds = [];
1362 if (result.length <= numPartitions - 1) {
1363 self.upperbounds = result; // supprimer les doublons peut-etre ici
1364 } else {
1365 var s = Math.floor(result.length / numPartitions);
1366 for (var i = 0; i < numPartitions - 1; i++) self.upperbounds.push(result[s * (i + 1)]);
1367 }
1368 done();
1369 });
1370 };
1371
1372 this.getPartitionIndex = function (data) {
1373 for (var i = 0; i < this.upperbounds.length; i++)
1374 if (data < this.upperbounds[i]) break;
1375 return i;
1376 };
1377}
1378
1379function HashPartitioner(numPartitions) {
1380 this.numPartitions = numPartitions;
1381 this.type = 'HashPartitioner';
1382}
1383
1384HashPartitioner.prototype.hash = function (o) {
1385 var i, h = 0, s = o.toString(), len = s.length;
1386 for (i = 0; i < len; i++) {
1387 h = ((h << 5) - h) + s.charCodeAt(i);
1388 h = h & h; // convert to 32 bit integer
1389 }
1390 return Math.abs(h);
1391};
1392
1393HashPartitioner.prototype.getPartitionIndex = function (data) {
1394 return this.hash(data) % this.numPartitions;
1395};
1396
1397module.exports = {
1398 Dataset: Dataset,
1399 Partition: Partition,
1400 parallelize: parallelize,
1401 range: range,
1402 GzipFile: GzipFile,
1403 TextFile: TextFile,
1404 TextDir: TextDir,
1405 TextS3Dir: TextS3Dir,
1406 TextS3File: TextS3File,
1407 Source: Source,
1408 Stream: Stream,
1409 Random: Random,
1410 Map: Map,
1411 FlatMap: FlatMap,
1412 MapValues: MapValues,
1413 FlatMapValues: FlatMapValues,
1414 Filter: Filter,
1415 Sample: Sample,
1416 Union: Union,
1417 AggregateByKey: AggregateByKey,
1418 Cartesian: Cartesian,
1419 SortBy: SortBy,
1420 PartitionBy: PartitionBy,
1421 RangePartitioner: RangePartitioner,
1422 HashPartitioner: HashPartitioner
1423};