UNPKG

38 kBMarkdownView Raw
1# Skale Reference
2
3<!-- START doctoc generated TOC please keep comment here to allow auto update -->
4<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
5
6
7- [Overview](#overview)
8- [Working with datasets](#working-with-datasets)
9 - [Sources](#sources)
10 - [Transformations](#transformations)
11 - [Actions](#actions)
12- [Skale module](#skale-module)
13 - [skale.context([config])](#skalecontextconfig)
14 - [sc.end()](#scend)
15 - [sc.parallelize(array)](#scparallelizearray)
16 - [sc.range(start[, end[, step]])](#scrangestart-end-step)
17 - [sc.textFile(path)](#sctextfilepath)
18 - [sc.lineStream(input_stream)](#sclinestreaminput_stream)
19 - [sc.objectStream(input_stream)](#scobjectstreaminput_stream)
20 - [Dataset methods](#dataset-methods)
21 - [ds.aggregate(reducer, combiner, init[, obj][, done])](#dsaggregatereducer-combiner-init-obj-done)
22 - [ds.aggregateByKey(reducer, combiner, init,[ obj])](#dsaggregatebykeyreducer-combiner-init-obj)
23 - [ds.cartesian(other)](#dscartesianother)
24 - [ds.coGroup(other)](#dscogroupother)
25 - [ds.collect([done])](#dscollectdone)
26 - [ds.count([done])](#dscountdone)
27 - [ds.countByKey([done])](#dscountbykeydone)
28 - [ds.countByValue([done])](#dscountbyvaluedone)
29 - [ds.distinct()](#dsdistinct)
30 - [ds.filter(filter[, obj])](#dsfilterfilter-obj)
31 - [ds.first([done])](#dsfirstdone)
32 - [ds.flatMap(flatMapper[, obj])](#dsflatmapflatmapper-obj)
33 - [ds.flatMapValues(flatMapper[, obj])](#dsflatmapvaluesflatmapper-obj)
34 - [ds.forEach(callback[, obj][, done])](#dsforeachcallback-obj-done)
35 - [ds.groupByKey()](#dsgroupbykey)
36 - [ds.intersection(other)](#dsintersectionother)
37 - [ds.join(other)](#dsjoinother)
38 - [ds.keys()](#dskeys)
39 - [ds.leftOuterJoin(other)](#dsleftouterjoinother)
40 - [ds.lookup(k[, done])](#dslookupk-done)
41 - [ds.map(mapper[, obj])](#dsmapmapper-obj)
42 - [ds.mapValues(mapper[, obj])](#dsmapvaluesmapper-obj)
43 - [ds.partitionBy(partitioner)](#dspartitionbypartitioner)
44 - [ds.persist()](#dspersist)
45 - [ds.reduce(reducer, init[, obj][, done])](#dsreducereducer-init-obj-done)
46 - [ds.reduceByKey(reducer, init[, obj])](#dsreducebykeyreducer-init-obj)
47 - [ds.rightOuterJoin(other)](#dsrightouterjoinother)
48 - [ds.sample(withReplacement, frac, seed)](#dssamplewithreplacement-frac-seed)
49 - [ds.sortBy(keyfunc[, ascending])](#dssortbykeyfunc-ascending)
50 - [ds.sortByKey(ascending)](#dssortbykeyascending)
51 - [ds.subtract(other)](#dssubtractother)
52 - [ds.take(num[, done])](#dstakenum-done)
53 - [ds.top(num[, done])](#dstopnum-done)
54 - [ds.union(other)](#dsunionother)
55 - [ds.values()](#dsvalues)
56 - [Partitioners](#partitioners)
57 - [HashPartitioner(numPartitions)](#hashpartitionernumpartitions)
58 - [RangePartitioner(numPartitions, keyfunc, dataset)](#rangepartitionernumpartitions-keyfunc-dataset)
59
60<!-- END doctoc generated TOC please keep comment here to allow auto update -->
61
62## Overview
63
64Skale is a fast and general purpose distributed data processing
65system. It provides a high-level API in Javascript and an optimized
66parallel execution engine.
67
68A Skale application consists of a *master* program that runs the
69user code and executes various *parallel operations* on a cluster
70of *workers*.
71
72The main abstraction Skale provides is a *dataset* which is similar
73to a Javascript *array*, but partitioned accross the workers that
74can be operated in parallel.
75
76There are several ways to create a dataset: *parallelizing* an existing
77array in the master program, or referencing a dataset in a distributed
78storage system (such as HDFS), or *streaming* the content of any
79source that can be processed through Node.js *Streams*. We call
80*source* a function which initializes a dataset.
81
82Datasets support two kinds of operations: *transformations*, which create
83a new dataset from an existing one, and *actions*, which
84return a value to the *master* program after running a computation
85on the dataset.
86
87For example, `map` is a transformation that applies a function to
88each element of a dataset, returning a new dataset. On the other
89hand, `reduce` is an action that aggregates all elements of a dataset
90using some function, and returns the final result to the master.
91
92*Sources* and *transformations* in Skale are *lazy*. They do not
93start right away, but are triggered by *actions*, thus allowing
94efficient pipelined execution and optimized data transfers.
95
96A first example:
97
98```javascript
99var sc = require('skale-engine').context(); // create a new context
100sc.parallelize([1, 2, 3, 4]). // source
101 map(function (x) {return x+1}). // transform
102 reduce(function (a, b) {return a+b}, 0). // action
103 then(console.log); // process result: 14
104```
105
106## Working with datasets
107
108### Sources
109
110After having initialized a cluster context using
111[skale.context()](#skale-context), one can create a dataset
112using the following sources:
113
114| Source Name | Description |
115| ------------------------------------------------- | ----------------------------------------------------- |
116|[lineStream(stream)](#sclinestreaminput_stream) | Create a dataset from a text stream |
117|[objectStream(stream)](#scobjectstreaminput_stream)| Create a dataset from an object stream |
118|[parallelize(array)](#scparallelizearray) | Create a dataset from an array |
119|[range(start,end,step)](#scrangestart-end-step) | Create a dataset containing integers from start to end|
120|[textFile(path)](#sctextfilepath) | Create a dataset from a regular text file |
121
122### Transformations
123
124Transformations operate on a dataset and return a new dataset. Note that some
125transformation operate only on datasets where each element is in the form
126of 2 elements array of key and value (`[k,v]` dataset):
127
128 [[Ki,Vi], ..., [Kj, Vj]]
129
130A special transformation `persist()` enables one to *persist* a dataset
131in memory, allowing efficient reuse accross parallel operations.
132
133|Transformation Name | Description | in | out |
134| ----------------- |-----------------------------------------------|-------|-------|
135|[aggregateByKey(func, func, init)](#dsaggregatebykeyreducer-combiner-init-obj)| reduce and combine by key using functions| [k,v]| [k,v]|
136|[cartesian(other)](#dscartesianother) | Perform a cartesian product with the other dataset | v w | [v,w]|
137|[coGroup(other)](#dscogroupother) | Group data from both datasets sharing the same key | [k,v] [k,w] |[k,[[v],[w]]]|
138|[distinct()](#dsdistinct) | Return a dataset where duplicates are removed | v | w|
139|[filter(func)](#dsfilterfilter-obj)| Return a dataset of elements on which function returns true | v | w|
140|[flatMap(func)](#dsflatmapflatmapper-obj)| Pass the dataset elements to a function which returns a sequence | v | w|
141|[flatMapValues(func)](#dsflatmapflatvaluesmapper-obj)| Pass the dataset [k,v] elements to a function without changing the keys | [k,v] | [k,w]|
142|[groupByKey()](#dsgroupbykey)| Group values with the same key | [k,v] | [k,[v]]|
143|[intersection(other)](#dsintersectionother) | Return a dataset containing only elements found in both datasets | v w | v|
144|[join(other)](#dsjoinother) | Perform an inner join between 2 datasets | [k,v] | [k,[v,w]]|
145|[leftOuterJoin(other)](#dsleftouterjoinother) | Join 2 datasets where the key must be present in the other | [k,v] | [k,[v,w]]|
146|[rightOuterJoin(other)](#dsrightouterjoinother) | Join 2 datasets where the key must be present in the first | [k,v] | [k,[v,w]]|
147|[keys()](#dskeys) | Return a dataset of just the keys | [k,v] | k|
148|[map(func)](#dsmapmapper-obj) | Return a dataset where elements are passed through a function | v | w|
149|[mapValues(func)](#dsmapvaluesmapper-obj)| Map a function to the value field of key-value dataset | [k,v] | [k,w]|
150|[reduceByKey(func, init)](#dsreducebykeyreducer-init-obj)| Combine values with the same key | [k,v] | [k,w]|
151|[partitionBy(partitioner)](#dspartitionbypartitioner)| Partition using the partitioner | v | v|
152|[persist()](#dspersist) | Idempotent. Keep content of dataset in cache for further reuse. | v | v|
153|[sample(rep, frac, seed)](#dssamplewithreplacement-frac-seed) | Sample a dataset, with or without replacement | v | w|
154|[sortBy(func)](#dssortbykeyfunc-ascending) | Sort a dataset | v | v|
155|[sortByKey()](#dssortbykeyascending) | Sort a [k,v] dataset | [k,v] | [k,v]|
156|[subtract(other)](#dssubtractother) | Remove the content of one dataset | v w | v|
157|[union(other)](#dsunionother) | Return a dataset containing elements from both datasets | v | v w|
158|[values()](#dsvalues) | Return a dataset of just the values | [k,v] | v|
159
160### Actions
161
162Actions operate on a dataset and send back results to the *master*. Results
163are always produced asynchronously and send to an optional callback function,
164alternatively through a returned [ES6 promise].
165
166|Action Name | Description | out|
167|------------------ |----------------------------------------------|--------------|
168|[aggregate(func, func, init)](#dsaggregatereducer-combiner-init-obj-done)| Similar to reduce() but may return a different type| value |
169|[collect()](#dscollectdone) | Return the content of dataset | array of elements|
170|[count()](#dscountdone) | Return the number of elements from dataset | number|
171|[countByKey()](#dscountbykeydone) | Return the number of occurrences for each key in a `[k,v]` dataset | array of [k,number]|
172|[countByValue()](#dscountbyvaluedone) | Return the number of occurrences of elements from dataset | array of [v,number]|
173|[first()](#dsfirstdone) | Return the first element in dataset | value |
174|[forEach(func)](#dsforeachcallback-obj-done)| Apply the provided function to each element of the dataset | empty |
175|[lookup(k)](#dslookupk-done) | Return the list of values `v` for key `k` in a `[k,v]` dataset | array of v|
176|[reduce(func, init)](#dsreducereducer-init-obj-done)| Aggregates dataset elements using a function into one value | value|
177|[take(num)](#dstakenum-done) | Return the first `num` elements of dataset | array of value|
178|[top(num)](#dstopnum-done) | Return the top `num` elements of dataset | array of value|
179
180## Skale module
181
182The Skale module is the main entry point for Skale functionality.
183To use it, one must `require('skale-engine')`.
184
185### skale.context([config])
186
187Creates and returns a new context which represents the connection
188to the Skale cluster, and which can be used to create datasets on that
189cluster. Config is an *Object* which defines the cluster server,
190with the following defaults:
191
192```javascript
193{
194 host: 'localhost', // Cluster server host, settable also by SKALE_HOST env
195 port: '12346' // Cluster server port, settable also by SKALE_PORT env
196}
197```
198
199Example:
200
201```javascript
202var skale = require('skale-engine');
203var sc = skale.context();
204```
205
206#### sc.end()
207
208Closes the connection to the cluster.
209
210#### sc.parallelize(array)
211
212Returns a new dataset containing elements from the *Array* array.
213
214Example:
215
216```javascript
217var a = sc.parallelize(['Hello', 'World']);
218```
219
220#### sc.range(start[, end[, step]])
221
222Returns a new dataset of integers from *start* to *end* (exclusive)
223increased by *step* (default 1) every element. If called with a
224single argument, the argument is interpreted as *end*, and *start*
225is set to 0.
226
227```javascript
228sc.range(5).collect().then(console.log)
229// [ 0, 1, 2, 3, 4 ]
230sc.range(2, 4).collect().then(console.log)
231// [ 2, 3 ]
232sc.range(10, -5, -3).collect().then(console.log)
233// [ 10, 7, 4, 1, -2 ]
234```
235
236#### sc.textFile(path)
237
238Returns a new dataset of lines composing the file specified by path
239*String*.
240
241Note: If using a path on the local filesystem, the file must also
242be accessible at the same path on worker nodes. Either copy the
243file to all workers or use a network-mounted shared file system.
244
245Example, the following program prints the length of a text file:
246
247```javascript
248var lines = sc.textFile('data.txt');
249lines.map(s => s.length).reduce((a, b) => a + b, 0).then(console.log);
250```
251
252#### sc.lineStream(input_stream)
253
254Returns a new dataset of lines of text read from input_stream
255*Object*, which is a [readable stream] where dataset content is
256read from.
257
258The following example computes the size of a file using streams:
259
260```javascript
261var stream = fs.createReadStream('data.txt', 'utf8');
262sc.lineStream(stream).
263 map(s => s.length).
264 reduce((a, b) => a + b, 0).
265 then(console.log);
266```
267
268#### sc.objectStream(input_stream)
269
270Returns a new dataset of Javascript *Objects* read from input_stream
271*Object*, which is a [readable stream] where dataset content is
272read from.
273
274The following example counts the number of objects returned in an
275object stream using the mongodb native Javascript driver:
276
277```javascript
278var cursor = db.collection('clients').find();
279sc.objectStream(cursor).count().then(console.log);
280```
281
282### Dataset methods
283
284Dataset objects, as created initially by above skale context source
285functions, have the following methods, allowing either to instantiate
286a new dataset through a transformation, or to return results to the
287master program.
288
289#### ds.aggregate(reducer, combiner, init[, obj][, done])
290
291This [action] computes the aggregated value of the elements
292of the dataset using two functions *reducer()* and *combiner()*,
293allowing to use an arbitrary accumulator type, different from element
294type (as opposed to `reduce()` which imposes the same type for
295accumulator and element).
296The result is passed to the *done()* callback if provided, otherwise an
297[ES6 promise] is returned.
298
299- *reducer*: a function of the form `function(acc, val[, obj[, wc]])`,
300 which returns the next value of the accumulator (which must be
301 of the same type as *acc*) and with:
302 - *acc*: the value of the accumulator, initially set to *init*
303 - *val*: the value of the next element of the dataset on which
304 `aggregate()` operates
305 - *obj*: the same parameter *obj* passed to `aggregate()`
306 - *wc*: the worker context, a persistent object local to each
307 worker, where user can store and access worker local dependencies.
308- *combiner*: a function of the form `function(acc1, acc2[, obj])`,
309 which returns the merged value of accumulators and with:
310 - *acc1*: the value of an accumulator, computed locally on a worker
311 - *acc2*: the value of an other accumulator, issued by another worker
312 - *obj*: the same parameter *obj* passed to `aggregate()`
313- *init*: the initial value of the accumulators that are used by
314 *reducer()* and *combiner()*. It should be the identity element
315 of the operation (a neutral zero value, i.e. applying it through the
316 function should not change result).
317- *obj*: user provided data. Data will be passed to carrying
318 serializable data from master to workers, obj is shared amongst
319 mapper executions over each element of the dataset.
320- *done*: a callback of the form `function(error, result)` which is
321 called at completion. If *undefined*, `aggregate()` returns an
322 [ES6 promise].
323
324The following example computes the average of a dataset, avoiding a `map()`:
325
326```javascript
327sc.parallelize([3, 5, 2, 7, 4, 8]).
328 aggregate((a, v) => [a[0] + v, a[1] + 1],
329 (a1, a2) => [a1[0] + a2[0], a1[1] + a2[1]], [0, 0]).
330 then(function(data) {
331 console.log(data[0] / data[1]);
332 })
333// 4.8333
334```
335
336#### ds.aggregateByKey(reducer, combiner, init,[ obj])
337
338When called on a dataset of type `[k,v]`, returns a dataset of type
339`[k,v]` where `v` is the aggregated value of all elements of same
340key `k`. The aggregation is performed using two functions *reducer()*
341and *combiner()* allowing to use an arbitrary accumulator type,
342different from element type.
343
344- *reducer*: a function of the form `function(acc, val[, obj[, wc]])`,
345 which returns the next value of the accumulator (which must be
346 of the same type as *acc*) and with:
347 - *acc*: the value of the accumulator, initially set to *init*
348 - *val*: the value `v` of the next `[k,v]` element of the dataset
349 on which `aggregateByKey()` operates
350 - *obj*: the same parameter *obj* passed to `aggregateByKey()`
351 - *wc*: the worker context, a persistent object local to each
352 worker, where user can store and access worker local dependencies.
353- *combiner*: a function of the form `function(acc1, acc2[, obj])`,
354 which returns the merged value of accumulators and with:
355 - *acc1*: the value of an accumulator, computed locally on a worker
356 - *acc2*: the value of an other accumulator, issued by another worker
357 - *obj*: the same parameter *obj* passed to `aggregate()`
358- *init*: the initial value of the accumulators that are used by
359 *reducer()* and *combiner()*. It should be the identity element
360 of the operation (a neutral zero value, i.e. applying it through the
361 function should not change result).
362- *obj*: user provided data. Data will be passed to carrying
363 serializable data from master to workers, obj is shared amongst
364 mapper executions over each element of the dataset.
365
366Example:
367
368```javascript
369sc.parallelize([['hello', 1], ['hello', 1], ['world', 1]]).
370 aggregateByKey((a, b) => a + b, (a, b) => a + b, 0).
371 collect().then(console.log);
372// [ [ 'hello', 2 ], [ 'world', 1 ] ]
373```
374
375#### ds.cartesian(other)
376
377Returns a dataset wich contains all possible pairs `[a, b]` where `a`
378is in the source dataset and `b` is in the *other* dataset.
379
380Example:
381
382```javascript
383var ds1 = sc.parallelize([1, 2]);
384var ds2 = sc.parallelize(['a', 'b', 'c']);
385ds1.cartesian(ds2).collect().then(console.log);
386// [ [ 1, 'a' ], [ 1, 'b' ], [ 1, 'c' ],
387// [ 2, 'a' ], [ 2, 'b' ], [ 2, 'c' ] ]
388```
389
390#### ds.coGroup(other)
391
392When called on dataset of type `[k,v]` and `[k,w]`, returns a dataset of type
393`[k, [[v], [w]]]`, where data of both datasets share the same key.
394
395Example:
396
397```javascript
398var ds1 = sc.parallelize([[10, 1], [20, 2]]);
399var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
400ds1.coGroup(ds2).collect().then(console.log);
401// [ [ 10, [ [ 1 ], [ 'world' ] ] ],
402// [ 20, [ [ 2 ], [] ] ],
403// [ 30, [ [], [ 3 ] ] ] ]
404```
405
406#### ds.collect([done])
407
408This [action] returns the content of the dataset in form of an array.
409The result is passed to the *done()* callback if provided, otherwise an
410[ES6 promise] is returned.
411
412- *done*: a callback of the form `function(error, result)` which is
413 called at completion.
414
415Example:
416
417```javascript
418sc.parallelize([1, 2, 3, 4]).
419 collect(function (err, res) {
420 console.log(res);
421 });
422// [ 1, 2, 3, 4 ]
423```
424
425#### ds.count([done])
426
427This [action] computes the number of elements in the dataset. The
428result is passed to the *done()* callback if provided, otherwise
429an [ES6 promise] is returned.
430
431- *done*: a callback of the form `function(error, result)` which is
432 called at completion.
433
434Example:
435
436```javascript
437sc.parallelize([10, 20, 30, 40]).count().then(console.log);
438// 4
439```
440
441#### ds.countByKey([done])
442
443When called on a dataset of type `[k,v]`, this [action] computes
444the number of occurrences of elements for each key in a dataset of
445type `[k,v]`. It produces an array of elements of type `[k,w]` where
446`w` is the result count. The result is passed to the *done()*
447callback if provided, otherwise an [ES6 promise] is returned.
448
449- *done*: a callback of the form `function(error, result)` which is
450 called at completion.
451
452Example:
453
454```javascript
455sc.parallelize([[10, 1], [20, 2], [10, 4]]).
456 countByKey().then(console.log);
457// [ [ 10, 2 ], [ 20, 1 ] ]
458```
459
460#### ds.countByValue([done])
461
462This [action] computes the number of occurences of each element in
463dataset and returns an array of elements of type `[v,n]` where `v`
464is the element and `n` its number of occurrences. The result is
465passed to the *done()* callback if provided, otherwise an [ES6
466promise] is returned.
467
468- *done*: a callback of the form `function(error, result)` which is
469 called at completion.
470
471Example:
472
473```javascript
474sc.parallelize([ 1, 2, 3, 1, 3, 2, 5 ]).
475 countByValue().then(console.log);
476// [ [ 1, 2 ], [ 2, 2 ], [ 3, 2 ], [ 5, 1 ] ]
477```
478
479#### ds.distinct()
480
481Returns a dataset where duplicates are removed.
482
483Example:
484
485```javascript
486sc.parallelize([ 1, 2, 3, 1, 4, 3, 5 ]).
487 distinct().
488 collect().then(console.log);
489// [ 1, 2, 3, 4, 5 ]
490```
491
492#### ds.filter(filter[, obj])
493
494- *filter*: a function of the form `callback(element[, obj[, wc]])`,
495 returning a *Boolean* and where:
496 - *element*: the next element of the dataset on which `filter()` operates
497 - *obj*: the same parameter *obj* passed to `filter()`
498 - *wc*: the worker context, a persistent object local to each
499 worker, where user can store and access worker local dependencies.
500- *obj*: user provided data. Data will be passed to carrying
501 serializable data from master to workers, obj is shared amongst
502 mapper executions over each element of the dataset
503
504Applies the provided filter function to each element of the source
505dataset and returns a new dataset containing the elements that passed the
506test.
507
508Example:
509
510```javascript
511function filter(data, obj) { return data % obj.modulo; }
512
513sc.parallelize([1, 2, 3, 4]).
514 filter(filter, {modulo: 2}).
515 collect().then(console.log);
516// [ 1, 3 ]
517```
518
519#### ds.first([done])
520
521This [action] computes the first element in this dataset.
522The result is passed to the *done()* callback if provided, otherwise an
523[ES6 promise] is returned.
524
525- *done*: a callback of the form `function(error, result)` which is
526 called at completion.
527
528```javascript
529sc.parallelize([1, 2, 3]).first().then(console.log);
530// 1
531```
532
533#### ds.flatMap(flatMapper[, obj])
534
535Applies the provided mapper function to each element of the source
536dataset and returns a new dataset.
537
538- *flatMapper*: a function of the form `callback(element[, obj[, wc]])`,
539 returning an *Array* and where:
540 - *element*: the next element of the dataset on which `flatMap()` operates
541 - *obj*: the same parameter *obj* passed to `flatMap()`
542 - *wc*: the worker context, a persistent object local to each
543 worker, where user can store and access worker local dependencies.
544- *obj*: user provided data. Data will be passed to carrying
545 serializable data from master to workers, obj is shared amongst
546 mapper executions over each element of the dataset
547
548Example:
549
550```javascript
551sc.range(5).flatMap(a => [a, a]).collect().then(console.log);
552// [ 0, 0, 1, 1, 2, 2, 3, 3, 4, 4 ]
553```
554
555#### ds.flatMapValues(flatMapper[, obj])
556
557Applies the provided flatMapper function to the value of each [key,
558value] element of the source dataset and return a new dataset containing
559elements defined as [key, mapper(value)], keeping the key unchanged
560for each source element.
561
562- *flatMapper*: a function of the form `callback(element[, obj[, wc]])`,
563 returning an *Array* and where:
564 - *element*: the value v of the next [k,v] element of the dataset on
565 which `flatMapValues()` operates
566 - *obj*: the same parameter *obj* passed to `flatMapValues()`
567 - *wc*: the worker context, a persistent object local to each
568 worker, where user can store and access worker local dependencies.
569- *obj*: user provided data. Data will be passed to carrying
570 serializable data from master to workers, obj is shared amongst
571 mapper executions over each element of the dataset
572
573Example:
574
575```javascript
576function valueFlatMapper(data, obj) {
577 var tmp = [];
578 for (var i = 0; i < obj.N; i++) tmp.push(data * obj.fact);
579 return tmp;
580}
581
582sc.parallelize([['hello', 1], ['world', 2]]).
583 flatMapValues(valueFlatMapper, {N: 2, fact: 2}).
584 collect().then(console.log);
585// [ [ 'hello', 2 ], [ 'hello', 2 ], [ 'world', 4 ], [ 'world', 4 ] ]
586```
587
588#### ds.forEach(callback[, obj][, done])
589
590This [action] applies a *callback* function on each element of the dataset.
591If provided, the *done()* callback is invoked at completion, otherwise an
592[ES6 promise] is returned.
593
594- *callback*: a function of the form `function(val[, obj[, wc]])`,
595 which returns *null* and with:
596 - *val*: the value of the next element of the dataset on which
597 `forEach()` operates
598 - *obj*: the same parameter *obj* passed to `forEach()`
599 - *wc*: the worker context, a persistent object local to each
600 worker, where user can store and access worker local dependencies.
601- *obj*: user provided data. Data will be passed to carrying
602 serializable data from master to workers, obj is shared amongst
603 mapper executions over each element of the dataset
604- *done*: a callback of the form `function(error, result)` which is
605 called at completion.
606
607
608In the following example, the `console.log()` callback provided
609to `forEach()` is executed on workers and may be not visible:
610
611```javascript
612sc.parallelize([1, 2, 3, 4]).
613 forEach(console.log).then(console.log('finished'));
614```
615
616#### ds.groupByKey()
617
618When called on a dataset of type `[k,v]`, returns a dataset of type `[k, [v]]`
619where values with the same key are grouped.
620
621Example:
622
623```javascript
624sc.parallelize([[10, 1], [20, 2], [10, 4]]).
625 groupByKey().collect().then(console.log);
626// [ [ 10, [ 1, 4 ] ], [ 20, [ 2 ] ] ]
627```
628
629#### ds.intersection(other)
630
631Returns a dataset containing only elements found in source dataset and *other*
632dataset.
633
634Example:
635
636```javascript
637var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
638var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
639ds1.intersection(ds2).collect().then(console.log); // [ 3, 4, 5 ]
640```
641
642#### ds.join(other)
643
644When called on source dataset of type `[k,v]` and *other* dataset of type
645`[k,w]`, returns a dataset of type `[k, [v, w]]` pairs with all pairs
646of elements for each key.
647
648Example:
649
650```javascript
651var ds1 = sc.parallelize([[10, 1], [20, 2]]);
652var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
653ds1.join(ds2).collect().then(console.log);
654// [ [ 10, [ 1, 'world' ] ] ]
655```
656
657#### ds.keys()
658
659When called on source dataset of type `[k,v]`, returns a dataset with just
660the elements `k`.
661
662Example:
663
664```javascript
665sc.parallelize([[10, 'world'], [30, 3]]).
666 keys.collect().then(console.log);
667// [ 10, 30 ]
668```
669
670#### ds.leftOuterJoin(other)
671
672When called on source dataset of type `[k,v]` and *other* dataset of type
673`[k,w]`, returns a dataset of type `[k, [v, w]]` pairs where the key
674must be present in the *other* dataset.
675
676Example:
677
678```javascript
679var ds1 = sc.parallelize([[10, 1], [20, 2]]);
680var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
681ds1.leftOuterJoin(ds2).collect().then(console.log);
682// [ [ 10, [ 1, 'world' ] ], [ 20, [ 2, null ] ] ]
683```
684
685#### ds.lookup(k[, done])
686
687When called on source dataset of type `[k,v]`, returns an array
688of values `v` for key `k`.
689The result is passed to the *done()* callback if provided, otherwise an
690[ES6 promise] is returned.
691
692- *done*: a callback of the form `function(error, result)` which is
693 called at completion.
694
695Example:
696
697```javascript
698sc.parallelize([[10, 'world'], [20, 2], [10, 1], [30, 3]]).
699 lookup(10).then(console.log);
700// [ world, 1 ]
701```
702
703#### ds.map(mapper[, obj])
704
705Applies the provided mapper function to each element of the source
706dataset and returns a new dataset.
707
708- *mapper*: a function of the form `callback(element[, obj[, wc]])`,
709 returning an element and where:
710 - *element*: the next element of the dataset on which `map()` operates
711 - *obj*: the same parameter *obj* passed to `map()`
712 - *wc*: the worker context, a persistent object local to each
713 worker, where user can store and access worker local dependencies.
714- *obj*: user provided data. Data will be passed to carrying
715 serializable data from master to workers, obj is shared amongst
716 mapper executions over each element of the dataset
717
718Example:
719
720```javascript
721sc.parallelize([1, 2, 3, 4]).
722 map((data, obj) => data * obj.scaling, {scaling: 1.2}).
723 collect().then(console.log);
724// [ 1.2, 2.4, 3.6, 4.8 ]
725```
726
727#### ds.mapValues(mapper[, obj])
728
729- *mapper*: a function of the form `callback(element[, obj[, wc]])`,
730 returning an element and where:
731 - *element*: the value v of the next [k,v] element of the dataset on
732 which `mapValues()` operates
733 - *obj*: the same parameter *obj* passed to `mapValues()`
734 - *wc*: the worker context, a persistent object local to each
735 worker, where user can store and access worker local dependencies
736- *obj*: user provided data. Data will be passed to carrying
737 serializable data from master to workers, obj is shared amongst
738 mapper executions over each element of the dataset
739
740Applies the provided mapper function to the value of each `[k,v]`
741element of the source dataset and return a new dataset containing elements
742defined as `[k, mapper(v)]`, keeping the key unchanged for each
743source element.
744
745Example:
746
747```javascript
748sc.parallelize([['hello', 1], ['world', 2]]).
749 mapValues((a, obj) => a*obj.fact, {fact: 2}).
750 collect().then(console.log);
751// [ ['hello', 2], ['world', 4] ]
752```
753
754#### ds.partitionBy(partitioner)
755
756Returns a dataset partitioned using the specified partitioner. The
757purpose of this transformation is not to change the dataset content,
758but to increase processing speed by ensuring that the elements
759accessed by further transfomations reside in the same partition.
760
761Example:
762
763```javascript
764var skale = require('skale-engine');
765var sc = skale.context();
766
767sc.parallelize([['hello', 1], ['world', 1], ['hello', 2], ['world', 2], ['cedric', 3]])
768 .partitionBy(new skale.HashPartitioner(3))
769 .collect.then(console.log)
770// [ ['world', 1], ['world', 2], ['hello', 1], ['hello', 2], ['cedric', 3] ]
771```
772
773#### ds.persist()
774
775Returns the dataset, and persists the dataset content on disk (and
776in memory if available) in order to directly reuse content in further
777tasks.
778
779Example:
780
781```javascript
782var dataset = sc.range(100).map(a => a * a);
783
784// First action: compute dataset
785dataset.collect().then(console.log)
786
787// Second action: reuse dataset, avoid map transform
788dataset.collect().then(console.log)
789```
790
791#### ds.reduce(reducer, init[, obj][, done])
792
793This [action] returns the aggregated value of the elements
794of the dataset using a *reducer()* function.
795The result is passed to the *done()* callback if provided, otherwise an
796[ES6 promise] is returned.
797
798- *reducer*: a function of the form `function(acc, val[, obj[, wc]])`,
799 which returns the next value of the accumulator (which must be
800 of the same type as *acc* and *val*) and with:
801 - *acc*: the value of the accumulator, initially set to *init*
802 - *val*: the value of the next element of the dataset on which
803 `reduce()` operates
804 - *obj*: the same parameter *obj* passed to `reduce()`
805 - *wc*: the worker context, a persistent object local to each
806 worker, where user can store and access worker local dependencies.
807- *init*: the initial value of the accumulators that are used by
808 *reducer()*. It should be the identity element of the operation
809 (i.e. applying it through the function should not change result).
810- *obj*: user provided data. Data will be passed to carrying
811 serializable data from master to workers, obj is shared amongst
812 mapper executions over each element of the dataset
813- *done*: a callback of the form `function(error, result)` which is
814 called at completion.
815
816Example:
817
818```javascript
819sc.parallelize([1, 2, 4, 8]).
820 reduce((a, b) => a + b, 0).
821 then(console.log);
822// 15
823```
824
825#### ds.reduceByKey(reducer, init[, obj])
826
827- *reducer*: a function of the form `callback(acc,val[, obj[, wc]])`,
828 returning the next value of the accumulator (which must be of the
829 same type as *acc* and *val*) and where:
830 - *acc*: the value of the accumulator, initially set to *init*
831 - *val*: the value `v` of the next `[k,v]` element of the dataset on
832 which `reduceByKey()` operates
833 - *obj*: the same parameter *obj* passed to `reduceByKey()`
834 - *wc*: the worker context, a persistent object local to each
835 worker, where user can store and access worker local dependencies.
836- *init*: the initial value of accumulator for each key. Will be
837 passed to *reducer*.
838- *obj*: user provided data. Data will be passed to carrying
839 serializable data from master to workers, obj is shared amongst
840 mapper executions over each element of the dataset
841
842When called on a dataset of type `[k,v]`, returns a dataset of type `[k,v]`
843where the values of each key are aggregated using the *reducer*
844function and the *init* initial value.
845
846Example:
847
848```javascript
849sc.parallelize([[10, 1], [10, 2], [10, 4]]).
850 reduceByKey((a,b) => a+b, 0).
851 collect().then(console.log);
852// [ [10, 7] ]
853```
854
855#### ds.rightOuterJoin(other)
856
857When called on source dataset of type `[k,v]` and *other* dataset of type
858`[k,w]`, returns a dataset of type `[k, [v, w]]` pairs where the key
859must be present in the *source* dataset.
860
861Example:
862
863```javascript
864var ds1 = sc.parallelize([[10, 1], [20, 2]]);
865var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
866ds1.rightOuterJoin(ds2).collect().then(console.log);
867// [ [ 10, [ 1, 'world' ] ], [ 30, [ null, 2 ] ] ]
868```
869
870#### ds.sample(withReplacement, frac, seed)
871
872- *withReplacement*: *Boolean* value, *true* if data must be sampled
873 with replacement
874- *frac*: *Number* value of the fraction of source dataset to return
875- *seed*: *Number* value of pseudo-random seed
876
877Returns a dataset by sampling a fraction *frac* of source dataset, with or
878without replacement, using a given random generator *seed*.
879
880Example:
881
882```javascript
883sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8]).
884 sample(true, 0.5, 0).
885 collect().then(console.log);
886// [ 1, 1, 3, 4, 4, 5, 7 ]
887```
888
889#### ds.sortBy(keyfunc[, ascending])
890
891Returns a dataset sorted by the given *keyfunc*.
892
893- *keyfunc*: a function of the form `function(element)` which returns
894 a value used for comparison in the sort function and where `element`
895 is the next element of the dataset on which `sortBy()` operates
896- *ascending*: a boolean to set the sort direction. Default: true
897
898Example:
899
900```javascript
901sc.parallelize([4, 6, 10, 5, 1, 2, 9, 7, 3, 0])
902 .sortBy(a => a)
903 .collect().then(console.log)
904// [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
905```
906
907#### ds.sortByKey(ascending)
908
909When called on a dataset of type `[k,v]`, returns a dataset of type `[k,v]`
910sorted on `k`. The optional parameter *ascending* is a boolean which sets
911the sort direction, true by default.
912
913Example:
914
915```javascript
916sc.parallelize([['world', 2], ['cedric', 3], ['hello', 1]])
917 .sortByKey()
918 .collect().then(console.log)
919// [['cedric', 3], ['hello', 1], ['world', 2]]
920```
921
922#### ds.subtract(other)
923
924Returns a dataset containing only elements of source dataset which
925are not in *other* dataset.
926
927Example:
928
929```javascript
930var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
931var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
932ds1.subtract(ds2).collect().then(console.log);
933// [ 1, 2 ]
934```
935
936#### ds.take(num[, done])
937
938This [action] returns an array of the `num` first elements of the
939source dataset. The result is passed to the *done()* callback if
940provided, otherwise an [ES6 promise] is returned.
941
942- *done*: a callback of the form `function(error, result)` which is
943 called at completion.
944
945Example:
946
947```javascript
948sc.range(5).take(2).then(console.log);
949// [1, 2]
950```
951
952#### ds.top(num[, done])
953
954This [action] returns an array of the `num` top elements of the
955source dataset. The result is passed to the *done()* callback if
956provided, otherwise an [ES6 promise] is returned.
957
958- *done*: a callback of the form `function(error, result)` which is
959 called at completion.
960
961Example:
962
963```javascript
964sc.range(5).top(2).then(console.log);
965// [3, 4]
966```
967
968#### ds.union(other)
969
970Returns a dataset that contains the union of the elements in the source
971dataset and the *other* dataset.
972
973Example:
974
975```javascript
976var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
977var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
978ds1.union(ds2).collect().then(console.log);
979// [ 1, 2, 3, 4, 5, 3, 4, 5, 6, 7 ]
980```
981
982#### ds.values()
983
984When called on source dataset of type `[k,v]`, returns a dataset with just
985the elements `v`.
986
987Example:
988
989```javascript
990sc.parallelize([[10, 'world'], [30, 3]]).
991 keys.collect().then(console.log);
992// [ 'world', 3 ]
993```
994
995### Partitioners
996
997A partitioner is an object passed to
998[ds.partitionBy(partitioner)](#dspartitionbypartitioner) which
999places data in partitions according to a strategy, for example hash
1000partitioning, where data having the same key are placed in the same
1001partition, or range partitioning, where data in the same range are
1002in the same partition. This is useful to accelerate processing, as
1003it limits data transfers between workers during jobs.
1004
1005A partition object must provide the following properties:
1006
1007- *numPartitions*: a *Number* of partitions for the dataset
1008- *getPartitionIndex*: a *Function* of type `function(element)`
1009 which returns the partition index (comprised between 0 and
1010 *numPartitions*) for the `element` of the dataset on which
1011 `partitionBy()` operates.
1012
1013#### HashPartitioner(numPartitions)
1014
1015Returns a partitioner object which implements hash based partitioning
1016using a hash checksum of each element as a string.
1017
1018- *numPartitions*: *Number* of partitions for this dataset
1019
1020Example:
1021
1022```javascript
1023var hp = new skale.HashPartitioner(3)
1024var dataset = sc.range(10).partitionBy(hp)
1025```
1026
1027#### RangePartitioner(numPartitions, keyfunc, dataset)
1028
1029Returns a partitioner object which first defines ranges by sampling
1030the dataset and then places elements by comparing them with ranges.
1031
1032- *numPartitions*: *Number* of partitions for this dataset
1033- *keyfunc*: a function of the form `function(element)` which returns
1034 a value used for comparison in the sort function and where `element`
1035 is the next element of the dataset on which `partitionBy()` operates
1036- *dataset*: the dataset object on which `partitionBy()` operates
1037
1038Example:
1039
1040```javascript
1041var dataset = sc.range(100)
1042var rp = new skale.RangePartitioner(3, a => a, dataset)
1043var dataset = sc.range(10).partitionBy(rp)
1044```
1045
1046[readable stream]: https://nodejs.org/api/stream.html#stream_class_stream_readable
1047[ES6 promise]: https://promisesaplus.com
1048[action]: #actions