UNPKG

39.9 kBMarkdownView Raw
1# Skale Reference
2
3<!-- START doctoc generated TOC please keep comment here to allow auto update -->
4<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
5
6
7- [Overview](#overview)
8- [Working with datasets](#working-with-datasets)
9 - [Sources](#sources)
10 - [Transformations](#transformations)
11 - [Actions](#actions)
12- [Skale module](#skale-module)
13 - [skale.context([config])](#skalecontextconfig)
14 - [sc.end()](#scend)
15 - [sc.parallelize(array)](#scparallelizearray)
16 - [sc.range(start[, end[, step]])](#scrangestart-end-step)
17 - [sc.textFile(path)](#sctextfilepath)
18 - [sc.lineStream(input_stream)](#sclinestreaminput_stream)
19 - [sc.objectStream(input_stream)](#scobjectstreaminput_stream)
20 - [Dataset methods](#dataset-methods)
21 - [ds.aggregate(reducer, combiner, init[, obj][, done])](#dsaggregatereducer-combiner-init-obj-done)
22 - [ds.aggregateByKey(reducer, combiner, init,[ obj])](#dsaggregatebykeyreducer-combiner-init-obj)
23 - [ds.cartesian(other)](#dscartesianother)
24 - [ds.coGroup(other)](#dscogroupother)
25 - [ds.collect([done])](#dscollectdone)
26 - [ds.count([done])](#dscountdone)
27 - [ds.countByKey([done])](#dscountbykeydone)
28 - [ds.countByValue([done])](#dscountbyvaluedone)
29 - [ds.distinct()](#dsdistinct)
30 - [ds.filter(filter[, obj])](#dsfilterfilter-obj)
31 - [ds.first([done])](#dsfirstdone)
32 - [ds.flatMap(flatMapper[, obj])](#dsflatmapflatmapper-obj)
33 - [ds.flatMapValues(flatMapper[, obj])](#dsflatmapvaluesflatmapper-obj)
34 - [ds.forEach(callback[, obj][, done])](#dsforeachcallback-obj-done)
35 - [ds.groupByKey()](#dsgroupbykey)
36 - [ds.intersection(other)](#dsintersectionother)
37 - [ds.join(other)](#dsjoinother)
38 - [ds.keys()](#dskeys)
39 - [ds.leftOuterJoin(other)](#dsleftouterjoinother)
40 - [ds.lookup(k[, done])](#dslookupk-done)
41 - [ds.map(mapper[, obj])](#dsmapmapper-obj)
42 - [ds.mapValues(mapper[, obj])](#dsmapvaluesmapper-obj)
43 - [ds.partitionBy(partitioner)](#dspartitionbypartitioner)
44 - [ds.persist()](#dspersist)
45 - [ds.reduce(reducer, init[, obj][, done])](#dsreducereducer-init-obj-done)
46 - [ds.reduceByKey(reducer, init[, obj])](#dsreducebykeyreducer-init-obj)
47 - [ds.rightOuterJoin(other)](#dsrightouterjoinother)
48 - [ds.sample(withReplacement, frac, seed)](#dssamplewithreplacement-frac-seed)
49 - [ds.save(url[, options][, done])](#dssaveurl-options-done)
50 - [ds.sortBy(keyfunc[, ascending])](#dssortbykeyfunc-ascending)
51 - [ds.sortByKey(ascending)](#dssortbykeyascending)
52 - [ds.subtract(other)](#dssubtractother)
53 - [ds.take(num[, done])](#dstakenum-done)
54 - [ds.top(num[, done])](#dstopnum-done)
55 - [ds.union(other)](#dsunionother)
56 - [ds.values()](#dsvalues)
57 - [Partitioners](#partitioners)
58 - [HashPartitioner(numPartitions)](#hashpartitionernumpartitions)
59 - [RangePartitioner(numPartitions, keyfunc, dataset)](#rangepartitionernumpartitions-keyfunc-dataset)
60
61<!-- END doctoc generated TOC please keep comment here to allow auto update -->
62
63## Overview
64
65Skale is a fast and general purpose distributed data processing
66system. It provides a high-level API in Javascript and an optimized
67parallel execution engine.
68
69A Skale application consists of a *master* program that runs the
70user code and executes various *parallel operations* on a cluster
71of *workers*.
72
73The main abstraction Skale provides is a *dataset* which is similar
74to a Javascript *array*, but partitioned accross the workers that
75can be operated in parallel.
76
77There are several ways to create a dataset: *parallelizing* an existing
78array in the master program, or referencing a dataset in a distributed
79storage system (such as HDFS), or *streaming* the content of any
80source that can be processed through Node.js *Streams*. We call
81*source* a function which initializes a dataset.
82
83Datasets support two kinds of operations: *transformations*, which create
84a new dataset from an existing one, and *actions*, which
85return a value to the *master* program after running a computation
86on the dataset.
87
88For example, `map` is a transformation that applies a function to
89each element of a dataset, returning a new dataset. On the other
90hand, `reduce` is an action that aggregates all elements of a dataset
91using some function, and returns the final result to the master.
92
93*Sources* and *transformations* in Skale are *lazy*. They do not
94start right away, but are triggered by *actions*, thus allowing
95efficient pipelined execution and optimized data transfers.
96
97A first example:
98
99```javascript
100var sc = require('skale-engine').context(); // create a new context
101sc.parallelize([1, 2, 3, 4]). // source
102 map(function (x) {return x+1}). // transform
103 reduce(function (a, b) {return a+b}, 0). // action
104 then(console.log); // process result: 14
105```
106
107## Working with datasets
108
109### Sources
110
111After having initialized a cluster context using
112[skale.context()](#skale-context), one can create a dataset
113using the following sources:
114
115| Source Name | Description |
116| ------------------------------------------------- | ------------------------------------------------------ |
117|[gzipFile(path)](#scgzipfilepath) | Create a dataset from a gzipped text file |
118|[lineStream(stream)](#sclinestreaminput_stream) | Create a dataset from a text stream |
119|[objectStream(stream)](#scobjectstreaminput_stream)| Create a dataset from an object stream |
120|[parallelize(array)](#scparallelizearray) | Create a dataset from an array |
121|[range(start,end,step)](#scrangestart-end-step) | Create a dataset containing integers from start to end |
122|[textFile(path)](#sctextfilepath) | Create a dataset from text file |
123
124### Transformations
125
126Transformations operate on a dataset and return a new dataset. Note that some
127transformation operate only on datasets where each element is in the form
128of 2 elements array of key and value (`[k,v]` dataset):
129
130 [[Ki,Vi], ..., [Kj, Vj]]
131
132A special transformation `persist()` enables one to *persist* a dataset
133in memory, allowing efficient reuse accross parallel operations.
134
135|Transformation Name | Description | in | out |
136| ----------------- |-----------------------------------------------|-------|-------|
137|[aggregateByKey(func, func, init)](#dsaggregatebykeyreducer-combiner-init-obj)| reduce and combine by key using functions| [k,v]| [k,v]|
138|[cartesian(other)](#dscartesianother) | Perform a cartesian product with the other dataset | v w | [v,w]|
139|[coGroup(other)](#dscogroupother) | Group data from both datasets sharing the same key | [k,v] [k,w] |[k,[[v],[w]]]|
140|[distinct()](#dsdistinct) | Return a dataset where duplicates are removed | v | w|
141|[filter(func)](#dsfilterfilter-obj)| Return a dataset of elements on which function returns true | v | w|
142|[flatMap(func)](#dsflatmapflatmapper-obj)| Pass the dataset elements to a function which returns a sequence | v | w|
143|[flatMapValues(func)](#dsflatmapflatvaluesmapper-obj)| Pass the dataset [k,v] elements to a function without changing the keys | [k,v] | [k,w]|
144|[groupByKey()](#dsgroupbykey)| Group values with the same key | [k,v] | [k,[v]]|
145|[intersection(other)](#dsintersectionother) | Return a dataset containing only elements found in both datasets | v w | v|
146|[join(other)](#dsjoinother) | Perform an inner join between 2 datasets | [k,v] | [k,[v,w]]|
147|[leftOuterJoin(other)](#dsleftouterjoinother) | Join 2 datasets where the key must be present in the other | [k,v] | [k,[v,w]]|
148|[rightOuterJoin(other)](#dsrightouterjoinother) | Join 2 datasets where the key must be present in the first | [k,v] | [k,[v,w]]|
149|[keys()](#dskeys) | Return a dataset of just the keys | [k,v] | k|
150|[map(func)](#dsmapmapper-obj) | Return a dataset where elements are passed through a function | v | w|
151|[mapValues(func)](#dsmapvaluesmapper-obj)| Map a function to the value field of key-value dataset | [k,v] | [k,w]|
152|[reduceByKey(func, init)](#dsreducebykeyreducer-init-obj)| Combine values with the same key | [k,v] | [k,w]|
153|[partitionBy(partitioner)](#dspartitionbypartitioner)| Partition using the partitioner | v | v|
154|[persist()](#dspersist) | Idempotent. Keep content of dataset in cache for further reuse. | v | v|
155|[sample(rep, frac, seed)](#dssamplewithreplacement-frac-seed) | Sample a dataset, with or without replacement | v | w|
156|[sortBy(func)](#dssortbykeyfunc-ascending) | Sort a dataset | v | v|
157|[sortByKey()](#dssortbykeyascending) | Sort a [k,v] dataset | [k,v] | [k,v]|
158|[subtract(other)](#dssubtractother) | Remove the content of one dataset | v w | v|
159|[union(other)](#dsunionother) | Return a dataset containing elements from both datasets | v | v w|
160|[values()](#dsvalues) | Return a dataset of just the values | [k,v] | v|
161
162### Actions
163
164Actions operate on a dataset and send back results to the *master*. Results
165are always produced asynchronously and send to an optional callback function,
166alternatively through a returned [ES6 promise].
167
168|Action Name | Description | out|
169|------------------ |----------------------------------------------|--------------|
170|[aggregate(func, func, init)](#dsaggregatereducer-combiner-init-obj-done)| Similar to reduce() but may return a different type| value |
171|[collect()](#dscollectdone) | Return the content of dataset | array of elements|
172|[count()](#dscountdone) | Return the number of elements from dataset | number|
173|[countByKey()](#dscountbykeydone) | Return the number of occurrences for each key in a `[k,v]` dataset | array of [k,number]|
174|[countByValue()](#dscountbyvaluedone) | Return the number of occurrences of elements from dataset | array of [v,number]|
175|[first()](#dsfirstdone) | Return the first element in dataset | value |
176|[forEach(func)](#dsforeachcallback-obj-done)| Apply the provided function to each element of the dataset | empty |
177|[lookup(k)](#dslookupk-done) | Return the list of values `v` for key `k` in a `[k,v]` dataset | array of v|
178|[reduce(func, init)](#dsreducereducer-init-obj-done)| Aggregates dataset elements using a function into one value | value|
179|[take(num)](#dstakenum-done) | Return the first `num` elements of dataset | array of value|
180|[top(num)](#dstopnum-done) | Return the top `num` elements of dataset | array of value|
181|[save(url)](#dssaveurl-options-done) | Save the content of a dataset to an url | empty |
182
183## Skale module
184
185The Skale module is the main entry point for Skale functionality.
186To use it, one must `require('skale-engine')`.
187
188### skale.context([config])
189
190Creates and returns a new context which represents the connection
191to the Skale cluster, and which can be used to create datasets on that
192cluster. Config is an *Object* which defines the cluster server,
193with the following defaults:
194
195```javascript
196{
197 host: 'localhost', // Cluster server host, settable also by SKALE_HOST env
198 port: '12346' // Cluster server port, settable also by SKALE_PORT env
199}
200```
201
202Example:
203
204```javascript
205var skale = require('skale-engine');
206var sc = skale.context();
207```
208
209#### sc.end()
210
211Closes the connection to the cluster.
212
213#### sc.parallelize(array)
214
215Returns a new dataset containing elements from the *Array* array.
216
217Example:
218
219```javascript
220var a = sc.parallelize(['Hello', 'World']);
221```
222
223#### sc.range(start[, end[, step]])
224
225Returns a new dataset of integers from *start* to *end* (exclusive)
226increased by *step* (default 1) every element. If called with a
227single argument, the argument is interpreted as *end*, and *start*
228is set to 0.
229
230```javascript
231sc.range(5).collect().then(console.log)
232// [ 0, 1, 2, 3, 4 ]
233sc.range(2, 4).collect().then(console.log)
234// [ 2, 3 ]
235sc.range(10, -5, -3).collect().then(console.log)
236// [ 10, 7, 4, 1, -2 ]
237```
238
239#### sc.textFile(path)
240
241Returns a new dataset of lines in file specified by path *String*.
242
243if *path* ends by a '/' (directory separator), then the dataset
244will be composed of all the files in the directory. Sub-directories
245are not supported.
246
247If a file name ends by '.gz', then its content will be automatically
248uncompressed using GZIP.
249
250Note: If using a path on the local filesystem, the file must also
251be accessible at the same path on worker nodes. Either copy the
252file to all workers or use a network-mounted shared file system.
253
254Example, the following program prints the length of a text file:
255
256```javascript
257var lines = sc.textFile('data.txt');
258lines.map(s => s.length).reduce((a, b) => a + b, 0).then(console.log);
259```
260
261#### sc.lineStream(input_stream)
262
263Returns a new dataset of lines of text read from input_stream
264*Object*, which is a [readable stream] where dataset content is
265read from.
266
267The following example computes the size of a file using streams:
268
269```javascript
270var stream = fs.createReadStream('data.txt', 'utf8');
271sc.lineStream(stream).
272 map(s => s.length).
273 reduce((a, b) => a + b, 0).
274 then(console.log);
275```
276
277#### sc.objectStream(input_stream)
278
279Returns a new dataset of Javascript *Objects* read from input_stream
280*Object*, which is a [readable stream] where dataset content is
281read from.
282
283The following example counts the number of objects returned in an
284object stream using the mongodb native Javascript driver:
285
286```javascript
287var cursor = db.collection('clients').find();
288sc.objectStream(cursor).count().then(console.log);
289```
290
291### Dataset methods
292
293Dataset objects, as created initially by above skale context source
294functions, have the following methods, allowing either to instantiate
295a new dataset through a transformation, or to return results to the
296master program.
297
298#### ds.aggregate(reducer, combiner, init[, obj][, done])
299
300This [action] computes the aggregated value of the elements
301of the dataset using two functions *reducer()* and *combiner()*,
302allowing to use an arbitrary accumulator type, different from element
303type (as opposed to `reduce()` which imposes the same type for
304accumulator and element).
305The result is passed to the *done()* callback if provided, otherwise an
306[ES6 promise] is returned.
307
308- *reducer*: a function of the form `function(acc, val[, obj[, wc]])`,
309 which returns the next value of the accumulator (which must be
310 of the same type as *acc*) and with:
311 - *acc*: the value of the accumulator, initially set to *init*
312 - *val*: the value of the next element of the dataset on which
313 `aggregate()` operates
314 - *obj*: the same parameter *obj* passed to `aggregate()`
315 - *wc*: the worker context, a persistent object local to each
316 worker, where user can store and access worker local dependencies.
317- *combiner*: a function of the form `function(acc1, acc2[, obj])`,
318 which returns the merged value of accumulators and with:
319 - *acc1*: the value of an accumulator, computed locally on a worker
320 - *acc2*: the value of an other accumulator, issued by another worker
321 - *obj*: the same parameter *obj* passed to `aggregate()`
322- *init*: the initial value of the accumulators that are used by
323 *reducer()* and *combiner()*. It should be the identity element
324 of the operation (a neutral zero value, i.e. applying it through the
325 function should not change result).
326- *obj*: user provided data. Data will be passed to carrying
327 serializable data from master to workers, obj is shared amongst
328 mapper executions over each element of the dataset.
329- *done*: a callback of the form `function(error, result)` which is
330 called at completion. If *undefined*, `aggregate()` returns an
331 [ES6 promise].
332
333The following example computes the average of a dataset, avoiding a `map()`:
334
335```javascript
336sc.parallelize([3, 5, 2, 7, 4, 8]).
337 aggregate((a, v) => [a[0] + v, a[1] + 1],
338 (a1, a2) => [a1[0] + a2[0], a1[1] + a2[1]], [0, 0]).
339 then(function(data) {
340 console.log(data[0] / data[1]);
341 })
342// 4.8333
343```
344
345#### ds.aggregateByKey(reducer, combiner, init,[ obj])
346
347When called on a dataset of type `[k,v]`, returns a dataset of type
348`[k,v]` where `v` is the aggregated value of all elements of same
349key `k`. The aggregation is performed using two functions *reducer()*
350and *combiner()* allowing to use an arbitrary accumulator type,
351different from element type.
352
353- *reducer*: a function of the form `function(acc, val[, obj[, wc]])`,
354 which returns the next value of the accumulator (which must be
355 of the same type as *acc*) and with:
356 - *acc*: the value of the accumulator, initially set to *init*
357 - *val*: the value `v` of the next `[k,v]` element of the dataset
358 on which `aggregateByKey()` operates
359 - *obj*: the same parameter *obj* passed to `aggregateByKey()`
360 - *wc*: the worker context, a persistent object local to each
361 worker, where user can store and access worker local dependencies.
362- *combiner*: a function of the form `function(acc1, acc2[, obj])`,
363 which returns the merged value of accumulators and with:
364 - *acc1*: the value of an accumulator, computed locally on a worker
365 - *acc2*: the value of an other accumulator, issued by another worker
366 - *obj*: the same parameter *obj* passed to `aggregate()`
367- *init*: the initial value of the accumulators that are used by
368 *reducer()* and *combiner()*. It should be the identity element
369 of the operation (a neutral zero value, i.e. applying it through the
370 function should not change result).
371- *obj*: user provided data. Data will be passed to carrying
372 serializable data from master to workers, obj is shared amongst
373 mapper executions over each element of the dataset.
374
375Example:
376
377```javascript
378sc.parallelize([['hello', 1], ['hello', 1], ['world', 1]]).
379 aggregateByKey((a, b) => a + b, (a, b) => a + b, 0).
380 collect().then(console.log);
381// [ [ 'hello', 2 ], [ 'world', 1 ] ]
382```
383
384#### ds.cartesian(other)
385
386Returns a dataset wich contains all possible pairs `[a, b]` where `a`
387is in the source dataset and `b` is in the *other* dataset.
388
389Example:
390
391```javascript
392var ds1 = sc.parallelize([1, 2]);
393var ds2 = sc.parallelize(['a', 'b', 'c']);
394ds1.cartesian(ds2).collect().then(console.log);
395// [ [ 1, 'a' ], [ 1, 'b' ], [ 1, 'c' ],
396// [ 2, 'a' ], [ 2, 'b' ], [ 2, 'c' ] ]
397```
398
399#### ds.coGroup(other)
400
401When called on dataset of type `[k,v]` and `[k,w]`, returns a dataset of type
402`[k, [[v], [w]]]`, where data of both datasets share the same key.
403
404Example:
405
406```javascript
407var ds1 = sc.parallelize([[10, 1], [20, 2]]);
408var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
409ds1.coGroup(ds2).collect().then(console.log);
410// [ [ 10, [ [ 1 ], [ 'world' ] ] ],
411// [ 20, [ [ 2 ], [] ] ],
412// [ 30, [ [], [ 3 ] ] ] ]
413```
414
415#### ds.collect([done])
416
417This [action] returns the content of the dataset in form of an array.
418The result is passed to the *done()* callback if provided, otherwise an
419[ES6 promise] is returned.
420
421- *done*: a callback of the form `function(error, result)` which is
422 called at completion.
423
424Example:
425
426```javascript
427sc.parallelize([1, 2, 3, 4]).
428 collect(function (err, res) {
429 console.log(res);
430 });
431// [ 1, 2, 3, 4 ]
432```
433
434#### ds.count([done])
435
436This [action] computes the number of elements in the dataset. The
437result is passed to the *done()* callback if provided, otherwise
438an [ES6 promise] is returned.
439
440- *done*: a callback of the form `function(error, result)` which is
441 called at completion.
442
443Example:
444
445```javascript
446sc.parallelize([10, 20, 30, 40]).count().then(console.log);
447// 4
448```
449
450#### ds.countByKey([done])
451
452When called on a dataset of type `[k,v]`, this [action] computes
453the number of occurrences of elements for each key in a dataset of
454type `[k,v]`. It produces an array of elements of type `[k,w]` where
455`w` is the result count. The result is passed to the *done()*
456callback if provided, otherwise an [ES6 promise] is returned.
457
458- *done*: a callback of the form `function(error, result)` which is
459 called at completion.
460
461Example:
462
463```javascript
464sc.parallelize([[10, 1], [20, 2], [10, 4]]).
465 countByKey().then(console.log);
466// [ [ 10, 2 ], [ 20, 1 ] ]
467```
468
469#### ds.countByValue([done])
470
471This [action] computes the number of occurences of each element in
472dataset and returns an array of elements of type `[v,n]` where `v`
473is the element and `n` its number of occurrences. The result is
474passed to the *done()* callback if provided, otherwise an [ES6
475promise] is returned.
476
477- *done*: a callback of the form `function(error, result)` which is
478 called at completion.
479
480Example:
481
482```javascript
483sc.parallelize([ 1, 2, 3, 1, 3, 2, 5 ]).
484 countByValue().then(console.log);
485// [ [ 1, 2 ], [ 2, 2 ], [ 3, 2 ], [ 5, 1 ] ]
486```
487
488#### ds.distinct()
489
490Returns a dataset where duplicates are removed.
491
492Example:
493
494```javascript
495sc.parallelize([ 1, 2, 3, 1, 4, 3, 5 ]).
496 distinct().
497 collect().then(console.log);
498// [ 1, 2, 3, 4, 5 ]
499```
500
501#### ds.filter(filter[, obj])
502
503- *filter*: a function of the form `callback(element[, obj[, wc]])`,
504 returning a *Boolean* and where:
505 - *element*: the next element of the dataset on which `filter()` operates
506 - *obj*: the same parameter *obj* passed to `filter()`
507 - *wc*: the worker context, a persistent object local to each
508 worker, where user can store and access worker local dependencies.
509- *obj*: user provided data. Data will be passed to carrying
510 serializable data from master to workers, obj is shared amongst
511 mapper executions over each element of the dataset
512
513Applies the provided filter function to each element of the source
514dataset and returns a new dataset containing the elements that passed the
515test.
516
517Example:
518
519```javascript
520function filter(data, obj) { return data % obj.modulo; }
521
522sc.parallelize([1, 2, 3, 4]).
523 filter(filter, {modulo: 2}).
524 collect().then(console.log);
525// [ 1, 3 ]
526```
527
528#### ds.first([done])
529
530This [action] computes the first element in this dataset.
531The result is passed to the *done()* callback if provided, otherwise an
532[ES6 promise] is returned.
533
534- *done*: a callback of the form `function(error, result)` which is
535 called at completion.
536
537```javascript
538sc.parallelize([1, 2, 3]).first().then(console.log);
539// 1
540```
541
542#### ds.flatMap(flatMapper[, obj])
543
544Applies the provided mapper function to each element of the source
545dataset and returns a new dataset.
546
547- *flatMapper*: a function of the form `callback(element[, obj[, wc]])`,
548 returning an *Array* and where:
549 - *element*: the next element of the dataset on which `flatMap()` operates
550 - *obj*: the same parameter *obj* passed to `flatMap()`
551 - *wc*: the worker context, a persistent object local to each
552 worker, where user can store and access worker local dependencies.
553- *obj*: user provided data. Data will be passed to carrying
554 serializable data from master to workers, obj is shared amongst
555 mapper executions over each element of the dataset
556
557Example:
558
559```javascript
560sc.range(5).flatMap(a => [a, a]).collect().then(console.log);
561// [ 0, 0, 1, 1, 2, 2, 3, 3, 4, 4 ]
562```
563
564#### ds.flatMapValues(flatMapper[, obj])
565
566Applies the provided flatMapper function to the value of each [key,
567value] element of the source dataset and return a new dataset containing
568elements defined as [key, mapper(value)], keeping the key unchanged
569for each source element.
570
571- *flatMapper*: a function of the form `callback(element[, obj[, wc]])`,
572 returning an *Array* and where:
573 - *element*: the value v of the next [k,v] element of the dataset on
574 which `flatMapValues()` operates
575 - *obj*: the same parameter *obj* passed to `flatMapValues()`
576 - *wc*: the worker context, a persistent object local to each
577 worker, where user can store and access worker local dependencies.
578- *obj*: user provided data. Data will be passed to carrying
579 serializable data from master to workers, obj is shared amongst
580 mapper executions over each element of the dataset
581
582Example:
583
584```javascript
585function valueFlatMapper(data, obj) {
586 var tmp = [];
587 for (var i = 0; i < obj.N; i++) tmp.push(data * obj.fact);
588 return tmp;
589}
590
591sc.parallelize([['hello', 1], ['world', 2]]).
592 flatMapValues(valueFlatMapper, {N: 2, fact: 2}).
593 collect().then(console.log);
594// [ [ 'hello', 2 ], [ 'hello', 2 ], [ 'world', 4 ], [ 'world', 4 ] ]
595```
596
597#### ds.forEach(callback[, obj][, done])
598
599This [action] applies a *callback* function on each element of the dataset.
600If provided, the *done()* callback is invoked at completion, otherwise an
601[ES6 promise] is returned.
602
603- *callback*: a function of the form `function(val[, obj[, wc]])`,
604 which returns *null* and with:
605 - *val*: the value of the next element of the dataset on which
606 `forEach()` operates
607 - *obj*: the same parameter *obj* passed to `forEach()`
608 - *wc*: the worker context, a persistent object local to each
609 worker, where user can store and access worker local dependencies.
610- *obj*: user provided data. Data will be passed to carrying
611 serializable data from master to workers, obj is shared amongst
612 mapper executions over each element of the dataset
613- *done*: a callback of the form `function(error, result)` which is
614 called at completion.
615
616
617In the following example, the `console.log()` callback provided
618to `forEach()` is executed on workers and may be not visible:
619
620```javascript
621sc.parallelize([1, 2, 3, 4]).
622 forEach(console.log).then(console.log('finished'));
623```
624
625#### ds.groupByKey()
626
627When called on a dataset of type `[k,v]`, returns a dataset of type `[k, [v]]`
628where values with the same key are grouped.
629
630Example:
631
632```javascript
633sc.parallelize([[10, 1], [20, 2], [10, 4]]).
634 groupByKey().collect().then(console.log);
635// [ [ 10, [ 1, 4 ] ], [ 20, [ 2 ] ] ]
636```
637
638#### ds.intersection(other)
639
640Returns a dataset containing only elements found in source dataset and *other*
641dataset.
642
643Example:
644
645```javascript
646var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
647var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
648ds1.intersection(ds2).collect().then(console.log); // [ 3, 4, 5 ]
649```
650
651#### ds.join(other)
652
653When called on source dataset of type `[k,v]` and *other* dataset of type
654`[k,w]`, returns a dataset of type `[k, [v, w]]` pairs with all pairs
655of elements for each key.
656
657Example:
658
659```javascript
660var ds1 = sc.parallelize([[10, 1], [20, 2]]);
661var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
662ds1.join(ds2).collect().then(console.log);
663// [ [ 10, [ 1, 'world' ] ] ]
664```
665
666#### ds.keys()
667
668When called on source dataset of type `[k,v]`, returns a dataset with just
669the elements `k`.
670
671Example:
672
673```javascript
674sc.parallelize([[10, 'world'], [30, 3]]).
675 keys.collect().then(console.log);
676// [ 10, 30 ]
677```
678
679#### ds.leftOuterJoin(other)
680
681When called on source dataset of type `[k,v]` and *other* dataset of type
682`[k,w]`, returns a dataset of type `[k, [v, w]]` pairs where the key
683must be present in the *other* dataset.
684
685Example:
686
687```javascript
688var ds1 = sc.parallelize([[10, 1], [20, 2]]);
689var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
690ds1.leftOuterJoin(ds2).collect().then(console.log);
691// [ [ 10, [ 1, 'world' ] ], [ 20, [ 2, null ] ] ]
692```
693
694#### ds.lookup(k[, done])
695
696When called on source dataset of type `[k,v]`, returns an array
697of values `v` for key `k`.
698The result is passed to the *done()* callback if provided, otherwise an
699[ES6 promise] is returned.
700
701- *done*: a callback of the form `function(error, result)` which is
702 called at completion.
703
704Example:
705
706```javascript
707sc.parallelize([[10, 'world'], [20, 2], [10, 1], [30, 3]]).
708 lookup(10).then(console.log);
709// [ world, 1 ]
710```
711
712#### ds.map(mapper[, obj])
713
714Applies the provided mapper function to each element of the source
715dataset and returns a new dataset.
716
717- *mapper*: a function of the form `callback(element[, obj[, wc]])`,
718 returning an element and where:
719 - *element*: the next element of the dataset on which `map()` operates
720 - *obj*: the same parameter *obj* passed to `map()`
721 - *wc*: the worker context, a persistent object local to each
722 worker, where user can store and access worker local dependencies.
723- *obj*: user provided data. Data will be passed to carrying
724 serializable data from master to workers, obj is shared amongst
725 mapper executions over each element of the dataset
726
727Example:
728
729```javascript
730sc.parallelize([1, 2, 3, 4]).
731 map((data, obj) => data * obj.scaling, {scaling: 1.2}).
732 collect().then(console.log);
733// [ 1.2, 2.4, 3.6, 4.8 ]
734```
735
736#### ds.mapValues(mapper[, obj])
737
738- *mapper*: a function of the form `callback(element[, obj[, wc]])`,
739 returning an element and where:
740 - *element*: the value v of the next [k,v] element of the dataset on
741 which `mapValues()` operates
742 - *obj*: the same parameter *obj* passed to `mapValues()`
743 - *wc*: the worker context, a persistent object local to each
744 worker, where user can store and access worker local dependencies
745- *obj*: user provided data. Data will be passed to carrying
746 serializable data from master to workers, obj is shared amongst
747 mapper executions over each element of the dataset
748
749Applies the provided mapper function to the value of each `[k,v]`
750element of the source dataset and return a new dataset containing elements
751defined as `[k, mapper(v)]`, keeping the key unchanged for each
752source element.
753
754Example:
755
756```javascript
757sc.parallelize([['hello', 1], ['world', 2]]).
758 mapValues((a, obj) => a*obj.fact, {fact: 2}).
759 collect().then(console.log);
760// [ ['hello', 2], ['world', 4] ]
761```
762
763#### ds.partitionBy(partitioner)
764
765Returns a dataset partitioned using the specified partitioner. The
766purpose of this transformation is not to change the dataset content,
767but to increase processing speed by ensuring that the elements
768accessed by further transfomations reside in the same partition.
769
770Example:
771
772```javascript
773var skale = require('skale-engine');
774var sc = skale.context();
775
776sc.parallelize([['hello', 1], ['world', 1], ['hello', 2], ['world', 2], ['cedric', 3]])
777 .partitionBy(new skale.HashPartitioner(3))
778 .collect.then(console.log)
779// [ ['world', 1], ['world', 2], ['hello', 1], ['hello', 2], ['cedric', 3] ]
780```
781
782#### ds.persist()
783
784Returns the dataset, and persists the dataset content on disk (and
785in memory if available) in order to directly reuse content in further
786tasks.
787
788Example:
789
790```javascript
791var dataset = sc.range(100).map(a => a * a);
792
793// First action: compute dataset
794dataset.collect().then(console.log)
795
796// Second action: reuse dataset, avoid map transform
797dataset.collect().then(console.log)
798```
799
800#### ds.reduce(reducer, init[, obj][, done])
801
802This [action] returns the aggregated value of the elements
803of the dataset using a *reducer()* function.
804The result is passed to the *done()* callback if provided, otherwise an
805[ES6 promise] is returned.
806
807- *reducer*: a function of the form `function(acc, val[, obj[, wc]])`,
808 which returns the next value of the accumulator (which must be
809 of the same type as *acc* and *val*) and with:
810 - *acc*: the value of the accumulator, initially set to *init*
811 - *val*: the value of the next element of the dataset on which
812 `reduce()` operates
813 - *obj*: the same parameter *obj* passed to `reduce()`
814 - *wc*: the worker context, a persistent object local to each
815 worker, where user can store and access worker local dependencies.
816- *init*: the initial value of the accumulators that are used by
817 *reducer()*. It should be the identity element of the operation
818 (i.e. applying it through the function should not change result).
819- *obj*: user provided data. Data will be passed to carrying
820 serializable data from master to workers, obj is shared amongst
821 mapper executions over each element of the dataset
822- *done*: a callback of the form `function(error, result)` which is
823 called at completion.
824
825Example:
826
827```javascript
828sc.parallelize([1, 2, 4, 8]).
829 reduce((a, b) => a + b, 0).
830 then(console.log);
831// 15
832```
833
834#### ds.reduceByKey(reducer, init[, obj])
835
836- *reducer*: a function of the form `callback(acc,val[, obj[, wc]])`,
837 returning the next value of the accumulator (which must be of the
838 same type as *acc* and *val*) and where:
839 - *acc*: the value of the accumulator, initially set to *init*
840 - *val*: the value `v` of the next `[k,v]` element of the dataset on
841 which `reduceByKey()` operates
842 - *obj*: the same parameter *obj* passed to `reduceByKey()`
843 - *wc*: the worker context, a persistent object local to each
844 worker, where user can store and access worker local dependencies.
845- *init*: the initial value of accumulator for each key. Will be
846 passed to *reducer*.
847- *obj*: user provided data. Data will be passed to carrying
848 serializable data from master to workers, obj is shared amongst
849 mapper executions over each element of the dataset
850
851When called on a dataset of type `[k,v]`, returns a dataset of type `[k,v]`
852where the values of each key are aggregated using the *reducer*
853function and the *init* initial value.
854
855Example:
856
857```javascript
858sc.parallelize([[10, 1], [10, 2], [10, 4]]).
859 reduceByKey((a,b) => a+b, 0).
860 collect().then(console.log);
861// [ [10, 7] ]
862```
863
864#### ds.rightOuterJoin(other)
865
866When called on source dataset of type `[k,v]` and *other* dataset of type
867`[k,w]`, returns a dataset of type `[k, [v, w]]` pairs where the key
868must be present in the *source* dataset.
869
870Example:
871
872```javascript
873var ds1 = sc.parallelize([[10, 1], [20, 2]]);
874var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
875ds1.rightOuterJoin(ds2).collect().then(console.log);
876// [ [ 10, [ 1, 'world' ] ], [ 30, [ null, 2 ] ] ]
877```
878
879#### ds.sample(withReplacement, frac, seed)
880
881- *withReplacement*: *Boolean* value, *true* if data must be sampled
882 with replacement
883- *frac*: *Number* value of the fraction of source dataset to return
884- *seed*: *Number* value of pseudo-random seed
885
886Returns a dataset by sampling a fraction *frac* of source dataset, with or
887without replacement, using a given random generator *seed*.
888
889Example:
890
891```javascript
892sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8]).
893 sample(true, 0.5, 0).
894 collect().then(console.log);
895// [ 1, 1, 3, 4, 4, 5, 7 ]
896```
897
898#### ds.save(url[, options][, done])
899
900This [action] saves the content of the dataset to the destination URL. The
901destination is a flat directory which will contain as many files as partitions
902in the dataset. Files are named from partition numbers, starting at 0.
903The file format is a stream of JSON strings (one per dataset
904element) separated by newlines.
905
906- *url*: a *String* of the general form `protocol://host/path` or `/path`. See
907 below for supported protocols
908- *options*: an *Object* with the following fields:
909 - *gzip*: *Boolean* (default false) to enable gzip compression. If compression
910 is enabled, files are suffixed with `.gz`
911- *done*: an optional callback function of the form `function(error, result)`
912 called at completion. If not provided, an [ES6 promise] is returned.
913
914##### File protocol
915
916The URL form is `file://path` or simply `path` where *path* is an absolute
917pathname in the master host local file system.
918
919Example:
920
921```javascript
922sc.range(300).save('/tmp/results/').then(sc.end());
923// will produce /tmp/results/0, /tmp/results/1
924```
925
926##### AWS S3 protocol
927
928The URL form is `s3://bucket/key`. AWS credentials must be provided by environment
929variables i.e `AWS_SECRET_ACCESS_KEY`, `AWS_ACCESS_KEY_ID`.
930
931Example:
932
933```javascript
934sc.range(300).save('s3://myproject/mydataset', {gzip: true}).then(sc.end());
935// will produce https://myproject.s3.amazonaws.com/mydataset/0.gz
936```
937
938#### ds.sortBy(keyfunc[, ascending])
939
940Returns a dataset sorted by the given *keyfunc*.
941
942- *keyfunc*: a function of the form `function(element)` which returns
943 a value used for comparison in the sort function and where `element`
944 is the next element of the dataset on which `sortBy()` operates
945- *ascending*: a boolean to set the sort direction. Default: true
946
947Example:
948
949```javascript
950sc.parallelize([4, 6, 10, 5, 1, 2, 9, 7, 3, 0])
951 .sortBy(a => a)
952 .collect().then(console.log)
953// [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
954```
955
956#### ds.sortByKey(ascending)
957
958When called on a dataset of type `[k,v]`, returns a dataset of type `[k,v]`
959sorted on `k`. The optional parameter *ascending* is a boolean which sets
960the sort direction, true by default.
961
962Example:
963
964```javascript
965sc.parallelize([['world', 2], ['cedric', 3], ['hello', 1]])
966 .sortByKey()
967 .collect().then(console.log)
968// [['cedric', 3], ['hello', 1], ['world', 2]]
969```
970
971#### ds.subtract(other)
972
973Returns a dataset containing only elements of source dataset which
974are not in *other* dataset.
975
976Example:
977
978```javascript
979var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
980var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
981ds1.subtract(ds2).collect().then(console.log);
982// [ 1, 2 ]
983```
984
985#### ds.take(num[, done])
986
987This [action] returns an array of the `num` first elements of the
988source dataset. The result is passed to the *done()* callback if
989provided, otherwise an [ES6 promise] is returned.
990
991- *done*: a callback of the form `function(error, result)` which is
992 called at completion.
993
994Example:
995
996```javascript
997sc.range(5).take(2).then(console.log);
998// [1, 2]
999```
1000
1001#### ds.top(num[, done])
1002
1003This [action] returns an array of the `num` top elements of the
1004source dataset. The result is passed to the *done()* callback if
1005provided, otherwise an [ES6 promise] is returned.
1006
1007- *done*: a callback of the form `function(error, result)` which is
1008 called at completion.
1009
1010Example:
1011
1012```javascript
1013sc.range(5).top(2).then(console.log);
1014// [3, 4]
1015```
1016
1017#### ds.union(other)
1018
1019Returns a dataset that contains the union of the elements in the source
1020dataset and the *other* dataset.
1021
1022Example:
1023
1024```javascript
1025var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
1026var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
1027ds1.union(ds2).collect().then(console.log);
1028// [ 1, 2, 3, 4, 5, 3, 4, 5, 6, 7 ]
1029```
1030
1031#### ds.values()
1032
1033When called on source dataset of type `[k,v]`, returns a dataset with just
1034the elements `v`.
1035
1036Example:
1037
1038```javascript
1039sc.parallelize([[10, 'world'], [30, 3]]).
1040 keys.collect().then(console.log);
1041// [ 'world', 3 ]
1042```
1043
1044### Partitioners
1045
1046A partitioner is an object passed to
1047[ds.partitionBy(partitioner)](#dspartitionbypartitioner) which
1048places data in partitions according to a strategy, for example hash
1049partitioning, where data having the same key are placed in the same
1050partition, or range partitioning, where data in the same range are
1051in the same partition. This is useful to accelerate processing, as
1052it limits data transfers between workers during jobs.
1053
1054A partition object must provide the following properties:
1055
1056- *numPartitions*: a *Number* of partitions for the dataset
1057- *getPartitionIndex*: a *Function* of type `function(element)`
1058 which returns the partition index (comprised between 0 and
1059 *numPartitions*) for the `element` of the dataset on which
1060 `partitionBy()` operates.
1061
1062#### HashPartitioner(numPartitions)
1063
1064Returns a partitioner object which implements hash based partitioning
1065using a hash checksum of each element as a string.
1066
1067- *numPartitions*: *Number* of partitions for this dataset
1068
1069Example:
1070
1071```javascript
1072var hp = new skale.HashPartitioner(3)
1073var dataset = sc.range(10).partitionBy(hp)
1074```
1075
1076#### RangePartitioner(numPartitions, keyfunc, dataset)
1077
1078Returns a partitioner object which first defines ranges by sampling
1079the dataset and then places elements by comparing them with ranges.
1080
1081- *numPartitions*: *Number* of partitions for this dataset
1082- *keyfunc*: a function of the form `function(element)` which returns
1083 a value used for comparison in the sort function and where `element`
1084 is the next element of the dataset on which `partitionBy()` operates
1085- *dataset*: the dataset object on which `partitionBy()` operates
1086
1087Example:
1088
1089```javascript
1090var dataset = sc.range(100)
1091var rp = new skale.RangePartitioner(3, a => a, dataset)
1092var dataset = sc.range(10).partitionBy(rp)
1093```
1094
1095[readable stream]: https://nodejs.org/api/stream.html#stream_class_stream_readable
1096[ES6 promise]: https://promisesaplus.com
1097[action]: #actions