1 | # Skale Reference
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 | - [Overview](#overview)
|
8 | - [Working with datasets](#working-with-datasets)
|
9 | - [Sources](#sources)
|
10 | - [Transformations](#transformations)
|
11 | - [Actions](#actions)
|
12 | - [Skale module](#skale-module)
|
13 | - [skale.context([config])](#skalecontextconfig)
|
14 | - [sc.end()](#scend)
|
15 | - [sc.parallelize(array)](#scparallelizearray)
|
16 | - [sc.range(start[, end[, step]])](#scrangestart-end-step)
|
17 | - [sc.textFile(path)](#sctextfilepath)
|
18 | - [sc.lineStream(input_stream)](#sclinestreaminput_stream)
|
19 | - [sc.objectStream(input_stream)](#scobjectstreaminput_stream)
|
20 | - [Dataset methods](#dataset-methods)
|
21 | - [ds.aggregate(reducer, combiner, init[, obj][, done])](#dsaggregatereducer-combiner-init-obj-done)
|
22 | - [ds.aggregateByKey(reducer, combiner, init,[ obj])](#dsaggregatebykeyreducer-combiner-init-obj)
|
23 | - [ds.cartesian(other)](#dscartesianother)
|
24 | - [ds.coGroup(other)](#dscogroupother)
|
25 | - [ds.collect([done])](#dscollectdone)
|
26 | - [ds.count([done])](#dscountdone)
|
27 | - [ds.countByKey([done])](#dscountbykeydone)
|
28 | - [ds.countByValue([done])](#dscountbyvaluedone)
|
29 | - [ds.distinct()](#dsdistinct)
|
30 | - [ds.filter(filter[, obj])](#dsfilterfilter-obj)
|
31 | - [ds.first([done])](#dsfirstdone)
|
32 | - [ds.flatMap(flatMapper[, obj])](#dsflatmapflatmapper-obj)
|
33 | - [ds.flatMapValues(flatMapper[, obj])](#dsflatmapvaluesflatmapper-obj)
|
34 | - [ds.forEach(callback[, obj][, done])](#dsforeachcallback-obj-done)
|
35 | - [ds.groupByKey()](#dsgroupbykey)
|
36 | - [ds.intersection(other)](#dsintersectionother)
|
37 | - [ds.join(other)](#dsjoinother)
|
38 | - [ds.keys()](#dskeys)
|
39 | - [ds.leftOuterJoin(other)](#dsleftouterjoinother)
|
40 | - [ds.lookup(k[, done])](#dslookupk-done)
|
41 | - [ds.map(mapper[, obj])](#dsmapmapper-obj)
|
42 | - [ds.mapValues(mapper[, obj])](#dsmapvaluesmapper-obj)
|
43 | - [ds.partitionBy(partitioner)](#dspartitionbypartitioner)
|
44 | - [ds.persist()](#dspersist)
|
45 | - [ds.reduce(reducer, init[, obj][, done])](#dsreducereducer-init-obj-done)
|
46 | - [ds.reduceByKey(reducer, init[, obj])](#dsreducebykeyreducer-init-obj)
|
47 | - [ds.rightOuterJoin(other)](#dsrightouterjoinother)
|
48 | - [ds.sample(withReplacement, frac, seed)](#dssamplewithreplacement-frac-seed)
|
49 | - [ds.save(url[, options][, done])](#dssaveurl-options-done)
|
50 | - [ds.sortBy(keyfunc[, ascending])](#dssortbykeyfunc-ascending)
|
51 | - [ds.sortByKey(ascending)](#dssortbykeyascending)
|
52 | - [ds.subtract(other)](#dssubtractother)
|
53 | - [ds.take(num[, done])](#dstakenum-done)
|
54 | - [ds.top(num[, done])](#dstopnum-done)
|
55 | - [ds.union(other)](#dsunionother)
|
56 | - [ds.values()](#dsvalues)
|
57 | - [Partitioners](#partitioners)
|
58 | - [HashPartitioner(numPartitions)](#hashpartitionernumpartitions)
|
59 | - [RangePartitioner(numPartitions, keyfunc, dataset)](#rangepartitionernumpartitions-keyfunc-dataset)
|
60 |
|
61 |
|
62 |
|
63 | ## Overview
|
64 |
|
65 | Skale is a fast and general purpose distributed data processing
|
66 | system. It provides a high-level API in Javascript and an optimized
|
67 | parallel execution engine.
|
68 |
|
69 | A Skale application consists of a *master* program that runs the
|
70 | user code and executes various *parallel operations* on a cluster
|
71 | of *workers*.
|
72 |
|
73 | The main abstraction Skale provides is a *dataset* which is similar
|
74 | to a Javascript *array*, but partitioned accross the workers that
|
75 | can be operated in parallel.
|
76 |
|
77 | There are several ways to create a dataset: *parallelizing* an existing
|
78 | array in the master program, or referencing a dataset in a distributed
|
79 | storage system (such as HDFS), or *streaming* the content of any
|
80 | source that can be processed through Node.js *Streams*. We call
|
81 | *source* a function which initializes a dataset.
|
82 |
|
83 | Datasets support two kinds of operations: *transformations*, which create
|
84 | a new dataset from an existing one, and *actions*, which
|
85 | return a value to the *master* program after running a computation
|
86 | on the dataset.
|
87 |
|
88 | For example, `map` is a transformation that applies a function to
|
89 | each element of a dataset, returning a new dataset. On the other
|
90 | hand, `reduce` is an action that aggregates all elements of a dataset
|
91 | using some function, and returns the final result to the master.
|
92 |
|
93 | *Sources* and *transformations* in Skale are *lazy*. They do not
|
94 | start right away, but are triggered by *actions*, thus allowing
|
95 | efficient pipelined execution and optimized data transfers.
|
96 |
|
97 | A first example:
|
98 |
|
99 | ```javascript
|
100 | var sc = require('skale-engine').context(); // create a new context
|
101 | sc.parallelize([1, 2, 3, 4]). // source
|
102 | map(function (x) {return x+1}). // transform
|
103 | reduce(function (a, b) {return a+b}, 0). // action
|
104 | then(console.log); // process result: 14
|
105 | ```
|
106 |
|
107 | ## Working with datasets
|
108 |
|
109 | ### Sources
|
110 |
|
111 | After having initialized a cluster context using
|
112 | [skale.context()](#skale-context), one can create a dataset
|
113 | using the following sources:
|
114 |
|
115 | | Source Name | Description |
|
116 | | ------------------------------------------------- | ------------------------------------------------------ |
|
117 | |[gzipFile(path)](#scgzipfilepath) | Create a dataset from a gzipped text file |
|
118 | |[lineStream(stream)](#sclinestreaminput_stream) | Create a dataset from a text stream |
|
119 | |[objectStream(stream)](#scobjectstreaminput_stream)| Create a dataset from an object stream |
|
120 | |[parallelize(array)](#scparallelizearray) | Create a dataset from an array |
|
121 | |[range(start,end,step)](#scrangestart-end-step) | Create a dataset containing integers from start to end |
|
122 | |[textFile(path)](#sctextfilepath) | Create a dataset from text file |
|
123 |
|
124 | ### Transformations
|
125 |
|
126 | Transformations operate on a dataset and return a new dataset. Note that some
|
127 | transformation operate only on datasets where each element is in the form
|
128 | of 2 elements array of key and value (`[k,v]` dataset):
|
129 |
|
130 | [[Ki,Vi], ..., [Kj, Vj]]
|
131 |
|
132 | A special transformation `persist()` enables one to *persist* a dataset
|
133 | in memory, allowing efficient reuse accross parallel operations.
|
134 |
|
135 | |Transformation Name | Description | in | out |
|
136 | | ----------------- |-----------------------------------------------|-------|-------|
|
137 | |[aggregateByKey(func, func, init)](#dsaggregatebykeyreducer-combiner-init-obj)| reduce and combine by key using functions| [k,v]| [k,v]|
|
138 | |[cartesian(other)](#dscartesianother) | Perform a cartesian product with the other dataset | v w | [v,w]|
|
139 | |[coGroup(other)](#dscogroupother) | Group data from both datasets sharing the same key | [k,v] [k,w] |[k,[[v],[w]]]|
|
140 | |[distinct()](#dsdistinct) | Return a dataset where duplicates are removed | v | w|
|
141 | |[filter(func)](#dsfilterfilter-obj)| Return a dataset of elements on which function returns true | v | w|
|
142 | |[flatMap(func)](#dsflatmapflatmapper-obj)| Pass the dataset elements to a function which returns a sequence | v | w|
|
143 | |[flatMapValues(func)](#dsflatmapflatvaluesmapper-obj)| Pass the dataset [k,v] elements to a function without changing the keys | [k,v] | [k,w]|
|
144 | |[groupByKey()](#dsgroupbykey)| Group values with the same key | [k,v] | [k,[v]]|
|
145 | |[intersection(other)](#dsintersectionother) | Return a dataset containing only elements found in both datasets | v w | v|
|
146 | |[join(other)](#dsjoinother) | Perform an inner join between 2 datasets | [k,v] | [k,[v,w]]|
|
147 | |[leftOuterJoin(other)](#dsleftouterjoinother) | Join 2 datasets where the key must be present in the other | [k,v] | [k,[v,w]]|
|
148 | |[rightOuterJoin(other)](#dsrightouterjoinother) | Join 2 datasets where the key must be present in the first | [k,v] | [k,[v,w]]|
|
149 | |[keys()](#dskeys) | Return a dataset of just the keys | [k,v] | k|
|
150 | |[map(func)](#dsmapmapper-obj) | Return a dataset where elements are passed through a function | v | w|
|
151 | |[mapValues(func)](#dsmapvaluesmapper-obj)| Map a function to the value field of key-value dataset | [k,v] | [k,w]|
|
152 | |[reduceByKey(func, init)](#dsreducebykeyreducer-init-obj)| Combine values with the same key | [k,v] | [k,w]|
|
153 | |[partitionBy(partitioner)](#dspartitionbypartitioner)| Partition using the partitioner | v | v|
|
154 | |[persist()](#dspersist) | Idempotent. Keep content of dataset in cache for further reuse. | v | v|
|
155 | |[sample(rep, frac, seed)](#dssamplewithreplacement-frac-seed) | Sample a dataset, with or without replacement | v | w|
|
156 | |[sortBy(func)](#dssortbykeyfunc-ascending) | Sort a dataset | v | v|
|
157 | |[sortByKey()](#dssortbykeyascending) | Sort a [k,v] dataset | [k,v] | [k,v]|
|
158 | |[subtract(other)](#dssubtractother) | Remove the content of one dataset | v w | v|
|
159 | |[union(other)](#dsunionother) | Return a dataset containing elements from both datasets | v | v w|
|
160 | |[values()](#dsvalues) | Return a dataset of just the values | [k,v] | v|
|
161 |
|
162 | ### Actions
|
163 |
|
164 | Actions operate on a dataset and send back results to the *master*. Results
|
165 | are always produced asynchronously and send to an optional callback function,
|
166 | alternatively through a returned [ES6 promise].
|
167 |
|
168 | |Action Name | Description | out|
|
169 | |------------------ |----------------------------------------------|--------------|
|
170 | |[aggregate(func, func, init)](#dsaggregatereducer-combiner-init-obj-done)| Similar to reduce() but may return a different type| value |
|
171 | |[collect()](#dscollectdone) | Return the content of dataset | array of elements|
|
172 | |[count()](#dscountdone) | Return the number of elements from dataset | number|
|
173 | |[countByKey()](#dscountbykeydone) | Return the number of occurrences for each key in a `[k,v]` dataset | array of [k,number]|
|
174 | |[countByValue()](#dscountbyvaluedone) | Return the number of occurrences of elements from dataset | array of [v,number]|
|
175 | |[first()](#dsfirstdone) | Return the first element in dataset | value |
|
176 | |[forEach(func)](#dsforeachcallback-obj-done)| Apply the provided function to each element of the dataset | empty |
|
177 | |[lookup(k)](#dslookupk-done) | Return the list of values `v` for key `k` in a `[k,v]` dataset | array of v|
|
178 | |[reduce(func, init)](#dsreducereducer-init-obj-done)| Aggregates dataset elements using a function into one value | value|
|
179 | |[take(num)](#dstakenum-done) | Return the first `num` elements of dataset | array of value|
|
180 | |[top(num)](#dstopnum-done) | Return the top `num` elements of dataset | array of value|
|
181 | |[save(url)](#dssaveurl-options-done) | Save the content of a dataset to an url | empty |
|
182 |
|
183 | ## Skale module
|
184 |
|
185 | The Skale module is the main entry point for Skale functionality.
|
186 | To use it, one must `require('skale-engine')`.
|
187 |
|
188 | ### skale.context([config])
|
189 |
|
190 | Creates and returns a new context which represents the connection
|
191 | to the Skale cluster, and which can be used to create datasets on that
|
192 | cluster. Config is an *Object* which defines the cluster server,
|
193 | with the following defaults:
|
194 |
|
195 | ```javascript
|
196 | {
|
197 | host: 'localhost', // Cluster server host, settable also by SKALE_HOST env
|
198 | port: '12346' // Cluster server port, settable also by SKALE_PORT env
|
199 | }
|
200 | ```
|
201 |
|
202 | Example:
|
203 |
|
204 | ```javascript
|
205 | var skale = require('skale-engine');
|
206 | var sc = skale.context();
|
207 | ```
|
208 |
|
209 | #### sc.end()
|
210 |
|
211 | Closes the connection to the cluster.
|
212 |
|
213 | #### sc.parallelize(array)
|
214 |
|
215 | Returns a new dataset containing elements from the *Array* array.
|
216 |
|
217 | Example:
|
218 |
|
219 | ```javascript
|
220 | var a = sc.parallelize(['Hello', 'World']);
|
221 | ```
|
222 |
|
223 | #### sc.range(start[, end[, step]])
|
224 |
|
225 | Returns a new dataset of integers from *start* to *end* (exclusive)
|
226 | increased by *step* (default 1) every element. If called with a
|
227 | single argument, the argument is interpreted as *end*, and *start*
|
228 | is set to 0.
|
229 |
|
230 | ```javascript
|
231 | sc.range(5).collect().then(console.log)
|
232 | // [ 0, 1, 2, 3, 4 ]
|
233 | sc.range(2, 4).collect().then(console.log)
|
234 | // [ 2, 3 ]
|
235 | sc.range(10, -5, -3).collect().then(console.log)
|
236 | // [ 10, 7, 4, 1, -2 ]
|
237 | ```
|
238 |
|
239 | #### sc.textFile(path)
|
240 |
|
241 | Returns a new dataset of lines in file specified by path *String*.
|
242 |
|
243 | if *path* ends by a '/' (directory separator), then the dataset
|
244 | will be composed of all the files in the directory. Sub-directories
|
245 | are not supported.
|
246 |
|
247 | If a file name ends by '.gz', then its content will be automatically
|
248 | uncompressed using GZIP.
|
249 |
|
250 | Note: If using a path on the local filesystem, the file must also
|
251 | be accessible at the same path on worker nodes. Either copy the
|
252 | file to all workers or use a network-mounted shared file system.
|
253 |
|
254 | Example, the following program prints the length of a text file:
|
255 |
|
256 | ```javascript
|
257 | var lines = sc.textFile('data.txt');
|
258 | lines.map(s => s.length).reduce((a, b) => a + b, 0).then(console.log);
|
259 | ```
|
260 |
|
261 | #### sc.lineStream(input_stream)
|
262 |
|
263 | Returns a new dataset of lines of text read from input_stream
|
264 | *Object*, which is a [readable stream] where dataset content is
|
265 | read from.
|
266 |
|
267 | The following example computes the size of a file using streams:
|
268 |
|
269 | ```javascript
|
270 | var stream = fs.createReadStream('data.txt', 'utf8');
|
271 | sc.lineStream(stream).
|
272 | map(s => s.length).
|
273 | reduce((a, b) => a + b, 0).
|
274 | then(console.log);
|
275 | ```
|
276 |
|
277 | #### sc.objectStream(input_stream)
|
278 |
|
279 | Returns a new dataset of Javascript *Objects* read from input_stream
|
280 | *Object*, which is a [readable stream] where dataset content is
|
281 | read from.
|
282 |
|
283 | The following example counts the number of objects returned in an
|
284 | object stream using the mongodb native Javascript driver:
|
285 |
|
286 | ```javascript
|
287 | var cursor = db.collection('clients').find();
|
288 | sc.objectStream(cursor).count().then(console.log);
|
289 | ```
|
290 |
|
291 | ### Dataset methods
|
292 |
|
293 | Dataset objects, as created initially by above skale context source
|
294 | functions, have the following methods, allowing either to instantiate
|
295 | a new dataset through a transformation, or to return results to the
|
296 | master program.
|
297 |
|
298 | #### ds.aggregate(reducer, combiner, init[, obj][, done])
|
299 |
|
300 | This [action] computes the aggregated value of the elements
|
301 | of the dataset using two functions *reducer()* and *combiner()*,
|
302 | allowing to use an arbitrary accumulator type, different from element
|
303 | type (as opposed to `reduce()` which imposes the same type for
|
304 | accumulator and element).
|
305 | The result is passed to the *done()* callback if provided, otherwise an
|
306 | [ES6 promise] is returned.
|
307 |
|
308 | - *reducer*: a function of the form `function(acc, val[, obj[, wc]])`,
|
309 | which returns the next value of the accumulator (which must be
|
310 | of the same type as *acc*) and with:
|
311 | - *acc*: the value of the accumulator, initially set to *init*
|
312 | - *val*: the value of the next element of the dataset on which
|
313 | `aggregate()` operates
|
314 | - *obj*: the same parameter *obj* passed to `aggregate()`
|
315 | - *wc*: the worker context, a persistent object local to each
|
316 | worker, where user can store and access worker local dependencies.
|
317 | - *combiner*: a function of the form `function(acc1, acc2[, obj])`,
|
318 | which returns the merged value of accumulators and with:
|
319 | - *acc1*: the value of an accumulator, computed locally on a worker
|
320 | - *acc2*: the value of an other accumulator, issued by another worker
|
321 | - *obj*: the same parameter *obj* passed to `aggregate()`
|
322 | - *init*: the initial value of the accumulators that are used by
|
323 | *reducer()* and *combiner()*. It should be the identity element
|
324 | of the operation (a neutral zero value, i.e. applying it through the
|
325 | function should not change result).
|
326 | - *obj*: user provided data. Data will be passed to carrying
|
327 | serializable data from master to workers, obj is shared amongst
|
328 | mapper executions over each element of the dataset.
|
329 | - *done*: a callback of the form `function(error, result)` which is
|
330 | called at completion. If *undefined*, `aggregate()` returns an
|
331 | [ES6 promise].
|
332 |
|
333 | The following example computes the average of a dataset, avoiding a `map()`:
|
334 |
|
335 | ```javascript
|
336 | sc.parallelize([3, 5, 2, 7, 4, 8]).
|
337 | aggregate((a, v) => [a[0] + v, a[1] + 1],
|
338 | (a1, a2) => [a1[0] + a2[0], a1[1] + a2[1]], [0, 0]).
|
339 | then(function(data) {
|
340 | console.log(data[0] / data[1]);
|
341 | })
|
342 | // 4.8333
|
343 | ```
|
344 |
|
345 | #### ds.aggregateByKey(reducer, combiner, init,[ obj])
|
346 |
|
347 | When called on a dataset of type `[k,v]`, returns a dataset of type
|
348 | `[k,v]` where `v` is the aggregated value of all elements of same
|
349 | key `k`. The aggregation is performed using two functions *reducer()*
|
350 | and *combiner()* allowing to use an arbitrary accumulator type,
|
351 | different from element type.
|
352 |
|
353 | - *reducer*: a function of the form `function(acc, val[, obj[, wc]])`,
|
354 | which returns the next value of the accumulator (which must be
|
355 | of the same type as *acc*) and with:
|
356 | - *acc*: the value of the accumulator, initially set to *init*
|
357 | - *val*: the value `v` of the next `[k,v]` element of the dataset
|
358 | on which `aggregateByKey()` operates
|
359 | - *obj*: the same parameter *obj* passed to `aggregateByKey()`
|
360 | - *wc*: the worker context, a persistent object local to each
|
361 | worker, where user can store and access worker local dependencies.
|
362 | - *combiner*: a function of the form `function(acc1, acc2[, obj])`,
|
363 | which returns the merged value of accumulators and with:
|
364 | - *acc1*: the value of an accumulator, computed locally on a worker
|
365 | - *acc2*: the value of an other accumulator, issued by another worker
|
366 | - *obj*: the same parameter *obj* passed to `aggregate()`
|
367 | - *init*: the initial value of the accumulators that are used by
|
368 | *reducer()* and *combiner()*. It should be the identity element
|
369 | of the operation (a neutral zero value, i.e. applying it through the
|
370 | function should not change result).
|
371 | - *obj*: user provided data. Data will be passed to carrying
|
372 | serializable data from master to workers, obj is shared amongst
|
373 | mapper executions over each element of the dataset.
|
374 |
|
375 | Example:
|
376 |
|
377 | ```javascript
|
378 | sc.parallelize([['hello', 1], ['hello', 1], ['world', 1]]).
|
379 | aggregateByKey((a, b) => a + b, (a, b) => a + b, 0).
|
380 | collect().then(console.log);
|
381 | // [ [ 'hello', 2 ], [ 'world', 1 ] ]
|
382 | ```
|
383 |
|
384 | #### ds.cartesian(other)
|
385 |
|
386 | Returns a dataset wich contains all possible pairs `[a, b]` where `a`
|
387 | is in the source dataset and `b` is in the *other* dataset.
|
388 |
|
389 | Example:
|
390 |
|
391 | ```javascript
|
392 | var ds1 = sc.parallelize([1, 2]);
|
393 | var ds2 = sc.parallelize(['a', 'b', 'c']);
|
394 | ds1.cartesian(ds2).collect().then(console.log);
|
395 | // [ [ 1, 'a' ], [ 1, 'b' ], [ 1, 'c' ],
|
396 | // [ 2, 'a' ], [ 2, 'b' ], [ 2, 'c' ] ]
|
397 | ```
|
398 |
|
399 | #### ds.coGroup(other)
|
400 |
|
401 | When called on dataset of type `[k,v]` and `[k,w]`, returns a dataset of type
|
402 | `[k, [[v], [w]]]`, where data of both datasets share the same key.
|
403 |
|
404 | Example:
|
405 |
|
406 | ```javascript
|
407 | var ds1 = sc.parallelize([[10, 1], [20, 2]]);
|
408 | var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
|
409 | ds1.coGroup(ds2).collect().then(console.log);
|
410 | // [ [ 10, [ [ 1 ], [ 'world' ] ] ],
|
411 | // [ 20, [ [ 2 ], [] ] ],
|
412 | // [ 30, [ [], [ 3 ] ] ] ]
|
413 | ```
|
414 |
|
415 | #### ds.collect([done])
|
416 |
|
417 | This [action] returns the content of the dataset in form of an array.
|
418 | The result is passed to the *done()* callback if provided, otherwise an
|
419 | [ES6 promise] is returned.
|
420 |
|
421 | - *done*: a callback of the form `function(error, result)` which is
|
422 | called at completion.
|
423 |
|
424 | Example:
|
425 |
|
426 | ```javascript
|
427 | sc.parallelize([1, 2, 3, 4]).
|
428 | collect(function (err, res) {
|
429 | console.log(res);
|
430 | });
|
431 | // [ 1, 2, 3, 4 ]
|
432 | ```
|
433 |
|
434 | #### ds.count([done])
|
435 |
|
436 | This [action] computes the number of elements in the dataset. The
|
437 | result is passed to the *done()* callback if provided, otherwise
|
438 | an [ES6 promise] is returned.
|
439 |
|
440 | - *done*: a callback of the form `function(error, result)` which is
|
441 | called at completion.
|
442 |
|
443 | Example:
|
444 |
|
445 | ```javascript
|
446 | sc.parallelize([10, 20, 30, 40]).count().then(console.log);
|
447 | // 4
|
448 | ```
|
449 |
|
450 | #### ds.countByKey([done])
|
451 |
|
452 | When called on a dataset of type `[k,v]`, this [action] computes
|
453 | the number of occurrences of elements for each key in a dataset of
|
454 | type `[k,v]`. It produces an array of elements of type `[k,w]` where
|
455 | `w` is the result count. The result is passed to the *done()*
|
456 | callback if provided, otherwise an [ES6 promise] is returned.
|
457 |
|
458 | - *done*: a callback of the form `function(error, result)` which is
|
459 | called at completion.
|
460 |
|
461 | Example:
|
462 |
|
463 | ```javascript
|
464 | sc.parallelize([[10, 1], [20, 2], [10, 4]]).
|
465 | countByKey().then(console.log);
|
466 | // [ [ 10, 2 ], [ 20, 1 ] ]
|
467 | ```
|
468 |
|
469 | #### ds.countByValue([done])
|
470 |
|
471 | This [action] computes the number of occurences of each element in
|
472 | dataset and returns an array of elements of type `[v,n]` where `v`
|
473 | is the element and `n` its number of occurrences. The result is
|
474 | passed to the *done()* callback if provided, otherwise an [ES6
|
475 | promise] is returned.
|
476 |
|
477 | - *done*: a callback of the form `function(error, result)` which is
|
478 | called at completion.
|
479 |
|
480 | Example:
|
481 |
|
482 | ```javascript
|
483 | sc.parallelize([ 1, 2, 3, 1, 3, 2, 5 ]).
|
484 | countByValue().then(console.log);
|
485 | // [ [ 1, 2 ], [ 2, 2 ], [ 3, 2 ], [ 5, 1 ] ]
|
486 | ```
|
487 |
|
488 | #### ds.distinct()
|
489 |
|
490 | Returns a dataset where duplicates are removed.
|
491 |
|
492 | Example:
|
493 |
|
494 | ```javascript
|
495 | sc.parallelize([ 1, 2, 3, 1, 4, 3, 5 ]).
|
496 | distinct().
|
497 | collect().then(console.log);
|
498 | // [ 1, 2, 3, 4, 5 ]
|
499 | ```
|
500 |
|
501 | #### ds.filter(filter[, obj])
|
502 |
|
503 | - *filter*: a function of the form `callback(element[, obj[, wc]])`,
|
504 | returning a *Boolean* and where:
|
505 | - *element*: the next element of the dataset on which `filter()` operates
|
506 | - *obj*: the same parameter *obj* passed to `filter()`
|
507 | - *wc*: the worker context, a persistent object local to each
|
508 | worker, where user can store and access worker local dependencies.
|
509 | - *obj*: user provided data. Data will be passed to carrying
|
510 | serializable data from master to workers, obj is shared amongst
|
511 | mapper executions over each element of the dataset
|
512 |
|
513 | Applies the provided filter function to each element of the source
|
514 | dataset and returns a new dataset containing the elements that passed the
|
515 | test.
|
516 |
|
517 | Example:
|
518 |
|
519 | ```javascript
|
520 | function filter(data, obj) { return data % obj.modulo; }
|
521 |
|
522 | sc.parallelize([1, 2, 3, 4]).
|
523 | filter(filter, {modulo: 2}).
|
524 | collect().then(console.log);
|
525 | // [ 1, 3 ]
|
526 | ```
|
527 |
|
528 | #### ds.first([done])
|
529 |
|
530 | This [action] computes the first element in this dataset.
|
531 | The result is passed to the *done()* callback if provided, otherwise an
|
532 | [ES6 promise] is returned.
|
533 |
|
534 | - *done*: a callback of the form `function(error, result)` which is
|
535 | called at completion.
|
536 |
|
537 | ```javascript
|
538 | sc.parallelize([1, 2, 3]).first().then(console.log);
|
539 | // 1
|
540 | ```
|
541 |
|
542 | #### ds.flatMap(flatMapper[, obj])
|
543 |
|
544 | Applies the provided mapper function to each element of the source
|
545 | dataset and returns a new dataset.
|
546 |
|
547 | - *flatMapper*: a function of the form `callback(element[, obj[, wc]])`,
|
548 | returning an *Array* and where:
|
549 | - *element*: the next element of the dataset on which `flatMap()` operates
|
550 | - *obj*: the same parameter *obj* passed to `flatMap()`
|
551 | - *wc*: the worker context, a persistent object local to each
|
552 | worker, where user can store and access worker local dependencies.
|
553 | - *obj*: user provided data. Data will be passed to carrying
|
554 | serializable data from master to workers, obj is shared amongst
|
555 | mapper executions over each element of the dataset
|
556 |
|
557 | Example:
|
558 |
|
559 | ```javascript
|
560 | sc.range(5).flatMap(a => [a, a]).collect().then(console.log);
|
561 | // [ 0, 0, 1, 1, 2, 2, 3, 3, 4, 4 ]
|
562 | ```
|
563 |
|
564 | #### ds.flatMapValues(flatMapper[, obj])
|
565 |
|
566 | Applies the provided flatMapper function to the value of each [key,
|
567 | value] element of the source dataset and return a new dataset containing
|
568 | elements defined as [key, mapper(value)], keeping the key unchanged
|
569 | for each source element.
|
570 |
|
571 | - *flatMapper*: a function of the form `callback(element[, obj[, wc]])`,
|
572 | returning an *Array* and where:
|
573 | - *element*: the value v of the next [k,v] element of the dataset on
|
574 | which `flatMapValues()` operates
|
575 | - *obj*: the same parameter *obj* passed to `flatMapValues()`
|
576 | - *wc*: the worker context, a persistent object local to each
|
577 | worker, where user can store and access worker local dependencies.
|
578 | - *obj*: user provided data. Data will be passed to carrying
|
579 | serializable data from master to workers, obj is shared amongst
|
580 | mapper executions over each element of the dataset
|
581 |
|
582 | Example:
|
583 |
|
584 | ```javascript
|
585 | function valueFlatMapper(data, obj) {
|
586 | var tmp = [];
|
587 | for (var i = 0; i < obj.N; i++) tmp.push(data * obj.fact);
|
588 | return tmp;
|
589 | }
|
590 |
|
591 | sc.parallelize([['hello', 1], ['world', 2]]).
|
592 | flatMapValues(valueFlatMapper, {N: 2, fact: 2}).
|
593 | collect().then(console.log);
|
594 | // [ [ 'hello', 2 ], [ 'hello', 2 ], [ 'world', 4 ], [ 'world', 4 ] ]
|
595 | ```
|
596 |
|
597 | #### ds.forEach(callback[, obj][, done])
|
598 |
|
599 | This [action] applies a *callback* function on each element of the dataset.
|
600 | If provided, the *done()* callback is invoked at completion, otherwise an
|
601 | [ES6 promise] is returned.
|
602 |
|
603 | - *callback*: a function of the form `function(val[, obj[, wc]])`,
|
604 | which returns *null* and with:
|
605 | - *val*: the value of the next element of the dataset on which
|
606 | `forEach()` operates
|
607 | - *obj*: the same parameter *obj* passed to `forEach()`
|
608 | - *wc*: the worker context, a persistent object local to each
|
609 | worker, where user can store and access worker local dependencies.
|
610 | - *obj*: user provided data. Data will be passed to carrying
|
611 | serializable data from master to workers, obj is shared amongst
|
612 | mapper executions over each element of the dataset
|
613 | - *done*: a callback of the form `function(error, result)` which is
|
614 | called at completion.
|
615 |
|
616 |
|
617 | In the following example, the `console.log()` callback provided
|
618 | to `forEach()` is executed on workers and may be not visible:
|
619 |
|
620 | ```javascript
|
621 | sc.parallelize([1, 2, 3, 4]).
|
622 | forEach(console.log).then(console.log('finished'));
|
623 | ```
|
624 |
|
625 | #### ds.groupByKey()
|
626 |
|
627 | When called on a dataset of type `[k,v]`, returns a dataset of type `[k, [v]]`
|
628 | where values with the same key are grouped.
|
629 |
|
630 | Example:
|
631 |
|
632 | ```javascript
|
633 | sc.parallelize([[10, 1], [20, 2], [10, 4]]).
|
634 | groupByKey().collect().then(console.log);
|
635 | // [ [ 10, [ 1, 4 ] ], [ 20, [ 2 ] ] ]
|
636 | ```
|
637 |
|
638 | #### ds.intersection(other)
|
639 |
|
640 | Returns a dataset containing only elements found in source dataset and *other*
|
641 | dataset.
|
642 |
|
643 | Example:
|
644 |
|
645 | ```javascript
|
646 | var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
|
647 | var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
|
648 | ds1.intersection(ds2).collect().then(console.log); // [ 3, 4, 5 ]
|
649 | ```
|
650 |
|
651 | #### ds.join(other)
|
652 |
|
653 | When called on source dataset of type `[k,v]` and *other* dataset of type
|
654 | `[k,w]`, returns a dataset of type `[k, [v, w]]` pairs with all pairs
|
655 | of elements for each key.
|
656 |
|
657 | Example:
|
658 |
|
659 | ```javascript
|
660 | var ds1 = sc.parallelize([[10, 1], [20, 2]]);
|
661 | var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
|
662 | ds1.join(ds2).collect().then(console.log);
|
663 | // [ [ 10, [ 1, 'world' ] ] ]
|
664 | ```
|
665 |
|
666 | #### ds.keys()
|
667 |
|
668 | When called on source dataset of type `[k,v]`, returns a dataset with just
|
669 | the elements `k`.
|
670 |
|
671 | Example:
|
672 |
|
673 | ```javascript
|
674 | sc.parallelize([[10, 'world'], [30, 3]]).
|
675 | keys.collect().then(console.log);
|
676 | // [ 10, 30 ]
|
677 | ```
|
678 |
|
679 | #### ds.leftOuterJoin(other)
|
680 |
|
681 | When called on source dataset of type `[k,v]` and *other* dataset of type
|
682 | `[k,w]`, returns a dataset of type `[k, [v, w]]` pairs where the key
|
683 | must be present in the *other* dataset.
|
684 |
|
685 | Example:
|
686 |
|
687 | ```javascript
|
688 | var ds1 = sc.parallelize([[10, 1], [20, 2]]);
|
689 | var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
|
690 | ds1.leftOuterJoin(ds2).collect().then(console.log);
|
691 | // [ [ 10, [ 1, 'world' ] ], [ 20, [ 2, null ] ] ]
|
692 | ```
|
693 |
|
694 | #### ds.lookup(k[, done])
|
695 |
|
696 | When called on source dataset of type `[k,v]`, returns an array
|
697 | of values `v` for key `k`.
|
698 | The result is passed to the *done()* callback if provided, otherwise an
|
699 | [ES6 promise] is returned.
|
700 |
|
701 | - *done*: a callback of the form `function(error, result)` which is
|
702 | called at completion.
|
703 |
|
704 | Example:
|
705 |
|
706 | ```javascript
|
707 | sc.parallelize([[10, 'world'], [20, 2], [10, 1], [30, 3]]).
|
708 | lookup(10).then(console.log);
|
709 | // [ world, 1 ]
|
710 | ```
|
711 |
|
712 | #### ds.map(mapper[, obj])
|
713 |
|
714 | Applies the provided mapper function to each element of the source
|
715 | dataset and returns a new dataset.
|
716 |
|
717 | - *mapper*: a function of the form `callback(element[, obj[, wc]])`,
|
718 | returning an element and where:
|
719 | - *element*: the next element of the dataset on which `map()` operates
|
720 | - *obj*: the same parameter *obj* passed to `map()`
|
721 | - *wc*: the worker context, a persistent object local to each
|
722 | worker, where user can store and access worker local dependencies.
|
723 | - *obj*: user provided data. Data will be passed to carrying
|
724 | serializable data from master to workers, obj is shared amongst
|
725 | mapper executions over each element of the dataset
|
726 |
|
727 | Example:
|
728 |
|
729 | ```javascript
|
730 | sc.parallelize([1, 2, 3, 4]).
|
731 | map((data, obj) => data * obj.scaling, {scaling: 1.2}).
|
732 | collect().then(console.log);
|
733 | // [ 1.2, 2.4, 3.6, 4.8 ]
|
734 | ```
|
735 |
|
736 | #### ds.mapValues(mapper[, obj])
|
737 |
|
738 | - *mapper*: a function of the form `callback(element[, obj[, wc]])`,
|
739 | returning an element and where:
|
740 | - *element*: the value v of the next [k,v] element of the dataset on
|
741 | which `mapValues()` operates
|
742 | - *obj*: the same parameter *obj* passed to `mapValues()`
|
743 | - *wc*: the worker context, a persistent object local to each
|
744 | worker, where user can store and access worker local dependencies
|
745 | - *obj*: user provided data. Data will be passed to carrying
|
746 | serializable data from master to workers, obj is shared amongst
|
747 | mapper executions over each element of the dataset
|
748 |
|
749 | Applies the provided mapper function to the value of each `[k,v]`
|
750 | element of the source dataset and return a new dataset containing elements
|
751 | defined as `[k, mapper(v)]`, keeping the key unchanged for each
|
752 | source element.
|
753 |
|
754 | Example:
|
755 |
|
756 | ```javascript
|
757 | sc.parallelize([['hello', 1], ['world', 2]]).
|
758 | mapValues((a, obj) => a*obj.fact, {fact: 2}).
|
759 | collect().then(console.log);
|
760 | // [ ['hello', 2], ['world', 4] ]
|
761 | ```
|
762 |
|
763 | #### ds.partitionBy(partitioner)
|
764 |
|
765 | Returns a dataset partitioned using the specified partitioner. The
|
766 | purpose of this transformation is not to change the dataset content,
|
767 | but to increase processing speed by ensuring that the elements
|
768 | accessed by further transfomations reside in the same partition.
|
769 |
|
770 | Example:
|
771 |
|
772 | ```javascript
|
773 | var skale = require('skale-engine');
|
774 | var sc = skale.context();
|
775 |
|
776 | sc.parallelize([['hello', 1], ['world', 1], ['hello', 2], ['world', 2], ['cedric', 3]])
|
777 | .partitionBy(new skale.HashPartitioner(3))
|
778 | .collect.then(console.log)
|
779 | // [ ['world', 1], ['world', 2], ['hello', 1], ['hello', 2], ['cedric', 3] ]
|
780 | ```
|
781 |
|
782 | #### ds.persist()
|
783 |
|
784 | Returns the dataset, and persists the dataset content on disk (and
|
785 | in memory if available) in order to directly reuse content in further
|
786 | tasks.
|
787 |
|
788 | Example:
|
789 |
|
790 | ```javascript
|
791 | var dataset = sc.range(100).map(a => a * a);
|
792 |
|
793 | // First action: compute dataset
|
794 | dataset.collect().then(console.log)
|
795 |
|
796 | // Second action: reuse dataset, avoid map transform
|
797 | dataset.collect().then(console.log)
|
798 | ```
|
799 |
|
800 | #### ds.reduce(reducer, init[, obj][, done])
|
801 |
|
802 | This [action] returns the aggregated value of the elements
|
803 | of the dataset using a *reducer()* function.
|
804 | The result is passed to the *done()* callback if provided, otherwise an
|
805 | [ES6 promise] is returned.
|
806 |
|
807 | - *reducer*: a function of the form `function(acc, val[, obj[, wc]])`,
|
808 | which returns the next value of the accumulator (which must be
|
809 | of the same type as *acc* and *val*) and with:
|
810 | - *acc*: the value of the accumulator, initially set to *init*
|
811 | - *val*: the value of the next element of the dataset on which
|
812 | `reduce()` operates
|
813 | - *obj*: the same parameter *obj* passed to `reduce()`
|
814 | - *wc*: the worker context, a persistent object local to each
|
815 | worker, where user can store and access worker local dependencies.
|
816 | - *init*: the initial value of the accumulators that are used by
|
817 | *reducer()*. It should be the identity element of the operation
|
818 | (i.e. applying it through the function should not change result).
|
819 | - *obj*: user provided data. Data will be passed to carrying
|
820 | serializable data from master to workers, obj is shared amongst
|
821 | mapper executions over each element of the dataset
|
822 | - *done*: a callback of the form `function(error, result)` which is
|
823 | called at completion.
|
824 |
|
825 | Example:
|
826 |
|
827 | ```javascript
|
828 | sc.parallelize([1, 2, 4, 8]).
|
829 | reduce((a, b) => a + b, 0).
|
830 | then(console.log);
|
831 | // 15
|
832 | ```
|
833 |
|
834 | #### ds.reduceByKey(reducer, init[, obj])
|
835 |
|
836 | - *reducer*: a function of the form `callback(acc,val[, obj[, wc]])`,
|
837 | returning the next value of the accumulator (which must be of the
|
838 | same type as *acc* and *val*) and where:
|
839 | - *acc*: the value of the accumulator, initially set to *init*
|
840 | - *val*: the value `v` of the next `[k,v]` element of the dataset on
|
841 | which `reduceByKey()` operates
|
842 | - *obj*: the same parameter *obj* passed to `reduceByKey()`
|
843 | - *wc*: the worker context, a persistent object local to each
|
844 | worker, where user can store and access worker local dependencies.
|
845 | - *init*: the initial value of accumulator for each key. Will be
|
846 | passed to *reducer*.
|
847 | - *obj*: user provided data. Data will be passed to carrying
|
848 | serializable data from master to workers, obj is shared amongst
|
849 | mapper executions over each element of the dataset
|
850 |
|
851 | When called on a dataset of type `[k,v]`, returns a dataset of type `[k,v]`
|
852 | where the values of each key are aggregated using the *reducer*
|
853 | function and the *init* initial value.
|
854 |
|
855 | Example:
|
856 |
|
857 | ```javascript
|
858 | sc.parallelize([[10, 1], [10, 2], [10, 4]]).
|
859 | reduceByKey((a,b) => a+b, 0).
|
860 | collect().then(console.log);
|
861 | // [ [10, 7] ]
|
862 | ```
|
863 |
|
864 | #### ds.rightOuterJoin(other)
|
865 |
|
866 | When called on source dataset of type `[k,v]` and *other* dataset of type
|
867 | `[k,w]`, returns a dataset of type `[k, [v, w]]` pairs where the key
|
868 | must be present in the *source* dataset.
|
869 |
|
870 | Example:
|
871 |
|
872 | ```javascript
|
873 | var ds1 = sc.parallelize([[10, 1], [20, 2]]);
|
874 | var ds2 = sc.parallelize([[10, 'world'], [30, 3]]);
|
875 | ds1.rightOuterJoin(ds2).collect().then(console.log);
|
876 | // [ [ 10, [ 1, 'world' ] ], [ 30, [ null, 2 ] ] ]
|
877 | ```
|
878 |
|
879 | #### ds.sample(withReplacement, frac, seed)
|
880 |
|
881 | - *withReplacement*: *Boolean* value, *true* if data must be sampled
|
882 | with replacement
|
883 | - *frac*: *Number* value of the fraction of source dataset to return
|
884 | - *seed*: *Number* value of pseudo-random seed
|
885 |
|
886 | Returns a dataset by sampling a fraction *frac* of source dataset, with or
|
887 | without replacement, using a given random generator *seed*.
|
888 |
|
889 | Example:
|
890 |
|
891 | ```javascript
|
892 | sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8]).
|
893 | sample(true, 0.5, 0).
|
894 | collect().then(console.log);
|
895 | // [ 1, 1, 3, 4, 4, 5, 7 ]
|
896 | ```
|
897 |
|
898 | #### ds.save(url[, options][, done])
|
899 |
|
900 | This [action] saves the content of the dataset to the destination URL. The
|
901 | destination is a flat directory which will contain as many files as partitions
|
902 | in the dataset. Files are named from partition numbers, starting at 0.
|
903 | The file format is a stream of JSON strings (one per dataset
|
904 | element) separated by newlines.
|
905 |
|
906 | - *url*: a *String* of the general form `protocol://host/path` or `/path`. See
|
907 | below for supported protocols
|
908 | - *options*: an *Object* with the following fields:
|
909 | - *gzip*: *Boolean* (default false) to enable gzip compression. If compression
|
910 | is enabled, files are suffixed with `.gz`
|
911 | - *done*: an optional callback function of the form `function(error, result)`
|
912 | called at completion. If not provided, an [ES6 promise] is returned.
|
913 |
|
914 | ##### File protocol
|
915 |
|
916 | The URL form is `file://path` or simply `path` where *path* is an absolute
|
917 | pathname in the master host local file system.
|
918 |
|
919 | Example:
|
920 |
|
921 | ```javascript
|
922 | sc.range(300).save('/tmp/results/').then(sc.end());
|
923 | // will produce /tmp/results/0, /tmp/results/1
|
924 | ```
|
925 |
|
926 | ##### AWS S3 protocol
|
927 |
|
928 | The URL form is `s3://bucket/key`. AWS credentials must be provided by environment
|
929 | variables i.e `AWS_SECRET_ACCESS_KEY`, `AWS_ACCESS_KEY_ID`.
|
930 |
|
931 | Example:
|
932 |
|
933 | ```javascript
|
934 | sc.range(300).save('s3://myproject/mydataset', {gzip: true}).then(sc.end());
|
935 | // will produce https://myproject.s3.amazonaws.com/mydataset/0.gz
|
936 | ```
|
937 |
|
938 | #### ds.sortBy(keyfunc[, ascending])
|
939 |
|
940 | Returns a dataset sorted by the given *keyfunc*.
|
941 |
|
942 | - *keyfunc*: a function of the form `function(element)` which returns
|
943 | a value used for comparison in the sort function and where `element`
|
944 | is the next element of the dataset on which `sortBy()` operates
|
945 | - *ascending*: a boolean to set the sort direction. Default: true
|
946 |
|
947 | Example:
|
948 |
|
949 | ```javascript
|
950 | sc.parallelize([4, 6, 10, 5, 1, 2, 9, 7, 3, 0])
|
951 | .sortBy(a => a)
|
952 | .collect().then(console.log)
|
953 | // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
954 | ```
|
955 |
|
956 | #### ds.sortByKey(ascending)
|
957 |
|
958 | When called on a dataset of type `[k,v]`, returns a dataset of type `[k,v]`
|
959 | sorted on `k`. The optional parameter *ascending* is a boolean which sets
|
960 | the sort direction, true by default.
|
961 |
|
962 | Example:
|
963 |
|
964 | ```javascript
|
965 | sc.parallelize([['world', 2], ['cedric', 3], ['hello', 1]])
|
966 | .sortByKey()
|
967 | .collect().then(console.log)
|
968 | // [['cedric', 3], ['hello', 1], ['world', 2]]
|
969 | ```
|
970 |
|
971 | #### ds.subtract(other)
|
972 |
|
973 | Returns a dataset containing only elements of source dataset which
|
974 | are not in *other* dataset.
|
975 |
|
976 | Example:
|
977 |
|
978 | ```javascript
|
979 | var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
|
980 | var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
|
981 | ds1.subtract(ds2).collect().then(console.log);
|
982 | // [ 1, 2 ]
|
983 | ```
|
984 |
|
985 | #### ds.take(num[, done])
|
986 |
|
987 | This [action] returns an array of the `num` first elements of the
|
988 | source dataset. The result is passed to the *done()* callback if
|
989 | provided, otherwise an [ES6 promise] is returned.
|
990 |
|
991 | - *done*: a callback of the form `function(error, result)` which is
|
992 | called at completion.
|
993 |
|
994 | Example:
|
995 |
|
996 | ```javascript
|
997 | sc.range(5).take(2).then(console.log);
|
998 | // [1, 2]
|
999 | ```
|
1000 |
|
1001 | #### ds.top(num[, done])
|
1002 |
|
1003 | This [action] returns an array of the `num` top elements of the
|
1004 | source dataset. The result is passed to the *done()* callback if
|
1005 | provided, otherwise an [ES6 promise] is returned.
|
1006 |
|
1007 | - *done*: a callback of the form `function(error, result)` which is
|
1008 | called at completion.
|
1009 |
|
1010 | Example:
|
1011 |
|
1012 | ```javascript
|
1013 | sc.range(5).top(2).then(console.log);
|
1014 | // [3, 4]
|
1015 | ```
|
1016 |
|
1017 | #### ds.union(other)
|
1018 |
|
1019 | Returns a dataset that contains the union of the elements in the source
|
1020 | dataset and the *other* dataset.
|
1021 |
|
1022 | Example:
|
1023 |
|
1024 | ```javascript
|
1025 | var ds1 = sc.parallelize([1, 2, 3, 4, 5]);
|
1026 | var ds2 = sc.parallelize([3, 4, 5, 6, 7]);
|
1027 | ds1.union(ds2).collect().then(console.log);
|
1028 | // [ 1, 2, 3, 4, 5, 3, 4, 5, 6, 7 ]
|
1029 | ```
|
1030 |
|
1031 | #### ds.values()
|
1032 |
|
1033 | When called on source dataset of type `[k,v]`, returns a dataset with just
|
1034 | the elements `v`.
|
1035 |
|
1036 | Example:
|
1037 |
|
1038 | ```javascript
|
1039 | sc.parallelize([[10, 'world'], [30, 3]]).
|
1040 | keys.collect().then(console.log);
|
1041 | // [ 'world', 3 ]
|
1042 | ```
|
1043 |
|
1044 | ### Partitioners
|
1045 |
|
1046 | A partitioner is an object passed to
|
1047 | [ds.partitionBy(partitioner)](#dspartitionbypartitioner) which
|
1048 | places data in partitions according to a strategy, for example hash
|
1049 | partitioning, where data having the same key are placed in the same
|
1050 | partition, or range partitioning, where data in the same range are
|
1051 | in the same partition. This is useful to accelerate processing, as
|
1052 | it limits data transfers between workers during jobs.
|
1053 |
|
1054 | A partition object must provide the following properties:
|
1055 |
|
1056 | - *numPartitions*: a *Number* of partitions for the dataset
|
1057 | - *getPartitionIndex*: a *Function* of type `function(element)`
|
1058 | which returns the partition index (comprised between 0 and
|
1059 | *numPartitions*) for the `element` of the dataset on which
|
1060 | `partitionBy()` operates.
|
1061 |
|
1062 | #### HashPartitioner(numPartitions)
|
1063 |
|
1064 | Returns a partitioner object which implements hash based partitioning
|
1065 | using a hash checksum of each element as a string.
|
1066 |
|
1067 | - *numPartitions*: *Number* of partitions for this dataset
|
1068 |
|
1069 | Example:
|
1070 |
|
1071 | ```javascript
|
1072 | var hp = new skale.HashPartitioner(3)
|
1073 | var dataset = sc.range(10).partitionBy(hp)
|
1074 | ```
|
1075 |
|
1076 | #### RangePartitioner(numPartitions, keyfunc, dataset)
|
1077 |
|
1078 | Returns a partitioner object which first defines ranges by sampling
|
1079 | the dataset and then places elements by comparing them with ranges.
|
1080 |
|
1081 | - *numPartitions*: *Number* of partitions for this dataset
|
1082 | - *keyfunc*: a function of the form `function(element)` which returns
|
1083 | a value used for comparison in the sort function and where `element`
|
1084 | is the next element of the dataset on which `partitionBy()` operates
|
1085 | - *dataset*: the dataset object on which `partitionBy()` operates
|
1086 |
|
1087 | Example:
|
1088 |
|
1089 | ```javascript
|
1090 | var dataset = sc.range(100)
|
1091 | var rp = new skale.RangePartitioner(3, a => a, dataset)
|
1092 | var dataset = sc.range(10).partitionBy(rp)
|
1093 | ```
|
1094 |
|
1095 | [readable stream]: https://nodejs.org/api/stream.html#stream_class_stream_readable
|
1096 | [ES6 promise]: https://promisesaplus.com
|
1097 | [action]: #actions
|