RDD
Spark RDD 编程
RDD 编程基础
RDD 创建
从文件系统中加载数据创建 RDD
Spark 采用 textFile() 方法来从文件系统中加载数据创建 RDD
该方法把文件的 URI 作为参数,这个 URI 可以是
- 本地文件系统的地址
- 分布式文件系统
HDFS的地址 - AmazonS3 的地址
- 等等
从本地文件系统中加载数据创建 RDD
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") |

从分布式文件系统 HDFS 中加载数据
>>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") |
通过并行集合(列表)创建 RDD
可以调用 SparkContext 的 parallelize 方法,在 Driver 中一个已经存在的集合(列表)上创建。
>>> array = [1,2,3,4,5] |

RDD 操作
转换操作
对于 RDD 而言,每一次转换操作都会产生不同的 RDD,供给下一个 “转换” 使用.
转换得到的 RDD 是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作.
| 操作 | 含义 |
|---|---|
| filter(func) | 筛选出满足函数 func 的元素,并返回一个新的数据集 |
| map(func) | 将每个元素传递到函数 func 中,并将结果返回为一个新的数据集 |
| flatMap(func) | 与 map () 相似,但每个输入元素都可以映射到 0 或多个输出结果 |
| groupByKey() | 应用于 (K,V) 键值对的数据集时,返回一个新的 (K,Iterable) 形式的数据集 |
| reduceByKey(func) | 应用于 (K,V) 键值对的数据集时,返回一个新的 (K,V) 形式的数据集,其中每个值是将每个 key 传递到函数 func 中进行聚合后的结果 |
filter(func)
筛选出满足函数 func 的元素,并返回一个新的数据集
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") |

map(func)
map(func) 操作将每个元素传递到函数 func 中,并将结果返回为一个新的数据集
>>> data = [1,2,3,4,5] |

flatMap(func)
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") |

groupByKey()
应用于 (K,V) 键值对的数据集时,返回一个新的 (K, Iterable) 形式的数据集
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \ |

reduceByKey(func)
应用于 (K,V) 键值对的数据集时,返回一个新的 (K, V) 形式的数据集,其中的每个值是将每个 key 传递到函数 func 中进行聚合后得到的结果
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \ |

行动操作
行动操作是真正触发计算的地方。Spark 程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
| 操作 | 含义 |
|---|---|
| count() | 返回数据集中的元素个数 |
| collect() | 以数组的形式返回数据集中的所有元素 |
| first() | 返回数据集中的第一个元素 |
| take(n) | 以数组的形式返回数据集中的前 n 个元素 |
| reduce(func) | 通过函数 func(输入两个参数并返回一个值)聚合数据集中的元素 |
| foreach(func) | 将数据集中的每个元素传递到函数 func 中运行 |
惰性机制
所谓的 “惰性机制” 是指,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发 “从头到尾” 的真正的计算
这里给出一段简单的语句来解释 Spark 的惰性机制
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") |
持久化
在 Spark 中,RDD 采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据
>>> list = ["Hadoop","Spark","Hive"] |
- 可以通过持久化(缓存)机制避免这种重复计算的开销
- 可以使用
persist()方法对一个 RDD 标记为持久化 - 之所以说 “标记为持久化”,是因为出现
persist()语句的地方,并不会马上计算生成 RDD 并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化 - 持久化后的 RDD 将会被保留在计算节点的内存中被后面的行动操作重复使用
>>> list = ["Hadoop","Spark","Hive"] |
分区
RDD 是弹性分布式数据集,通常 RDD 很大,会被分成很多个分区,分别保存在不同的节点上
RDD 分区的一个原则是使得分区的个数尽量等于集群中的 CPU 核心(core)数目
键值对 RDD
键值对 RDD 的创建
从文件中加载
可以采用多种方式创建键值对 RDD,其中一种主要方式是使用 map() 函数来实现
>>> lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt") |
通过并行集合(列表)创建 RDD
>>> list = ["Hadoop","Spark","Hive","Spark"] |
常用的键值对 RDD 转换操作
reduceByKey(func)
使用 func 函数合并具有相同键的值
>>> pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]) |
groupByKey()
对具有相同键的值进行分组
>>> list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)] |
sortByKey()
返回一个根据键排序的 RDD
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] |
mapValues(func)
对键值对 RDD 中的每个 value 都应用一个函数,但是,key 不会发生变化
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] |
join
join 就表示内连接。对于内连接,对于给定的两个输入数据集 (K,V1) 和 (K,V2),只有在两个数据集中都存在的 key 才会被输出,最终得到一个 (K,(V1,V2)) 类型的数据集。
>>> pairRDD1 = sc. \ |