Java大数据开发入门系列(四)————Spark之RDD

RDD简介

RDD 全称为 Resilient Distributed Datasets,是 Spark 最基本的数据抽象。可以简单地把 RDD 理解成一个提供了许多操作接口的数据集合。和一般数据集不同的是,其实际数据被划分为一到多个分区,所有介区数据分布存储于一批机器中(内存或磁盘中),这里的分区可以简单地和 Hadoop HDFS 里面的文件块来对比理解。如图所示:

1600093597352

定义了一个名字为“myRDD”的 RDD 数据集,这个数据集被切分成了多个分区(Partion,可以对比 HDFS 的 Block 的概念来理解),可能每个分区实际存储在不同的机器上,同时也可能存储在内存(Memory)或硬盘上(HDFS,当然也可能是其他分布式文件系统)。

RDD是只读的、分区记录的集合,支持并行操作,可以由外部数据集或其他 RDD 转换而来,它具有以下特性:

  • 一个 RDD 由一个或者多个分区(Partitions)组成。对于 RDD 来说,每个分区会被一个计算任务所处理,用户可以在创建 RDD 时指定其分区个数,如果没有指定,则默认采用程序所分配到的 CPU 的核心数;
  • RDD 拥有一个用于计算分区的函数 compute;
  • RDD 会保存彼此间的依赖关系,RDD 的每次转换都会生成一个新的依赖关系,这种 RDD 之间的依赖关系就像流水线一样。在部分分区数据丢失后,可以通过这种依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算;
  • Key-Value 型的 RDD 还拥有 Partitioner(分区器),用于决定数据被存储在哪个分区中,目前 Spark 中支持 HashPartitioner(按照哈希分区) 和 RangeParationer(按照范围进行分区);
  • 一个优先位置列表 (可选),用于存储每个分区的优先位置 (prefered location)。对于一个 HDFS 文件来说,这个列表保存的就是每个分区所在的块的位置,按照“移动数据不如移动计算“的理念,Spark 在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理数据块的存储位置。

RDD两大类操作(spark 常用的算子)

RDD主要有两大类操作,分别为转换操作(Tiansformations)和行动操作(Actions))。转换操作主要是指把原始数据集加载到 RDD 以及把一个 RDD 转换为另外一个 RDD,而行动操作主要指把RDD 存储到硬盘或触发转换执行。例如,map 是一个 Transformation 操作,该操作作用于数据集上的每一个元素,并且返回一个新的 RDD 作为结果。而 reduce 是一个 Action 操作,该操作通过一些函数聚合 RDD 中的所有元素并且返回最终的结果给 Driver。

spark 常用的 Transformation 算子

Transformation 算子 Meaning(含义)
map(func) 对原 RDD 中每个元素运用 func 函数,并生成新的 RDD
filter(func) 对原 RDD 中每个元素使用func 函数进行过滤,并生成新的 RDD
flatMap(func) 与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items( func 返回类型需要为 Seq )。
mapPartitions(func) 与 map 类似,但函数单独在 RDD 的每个分区上运行, func函数的类型为 Iterator => Iterator ,其中 T 是 RDD 的类型,即 RDD[T]
mapPartitionsWithIndex(func) 与 mapPartitions 类似,但 func 类型为 (Int, Iterator) => Iterator ,其中第一个参数为分区索引
sample(withReplacement, fraction, seed) 数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed);
union(otherDataset) 合并两个 RDD
intersection(otherDataset) 求两个 RDD 的交集
distinct([numTasks])) 去重
groupByKey([numTasks]) 按照 key 值进行分区,即在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, Iterable) Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKeyaggregateByKey 性能会更好 Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传入 numTasks 参数进行修改。
reduceByKey(func, [numTasks]) 按照 key 值进行分组,并对分组后的数据执行归约操作。
aggregateByKey(zeroValue,numPartitions)(seqOp, combOp, [numTasks]) 当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和 zeroValue 聚合每个键的值。与 groupByKey 类似,reduce 任务的数量可通过第二个参数进行配置。
sortByKey([ascending], [numTasks]) 按照 key 进行排序,其中的 key 需要实现 Ordered 特质,即可比较
join(otherDataset, [numTasks]) 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,等价于内连接操作。如果想要执行外连接,可以使用 leftOuterJoin, rightOuterJoinfullOuterJoin 等算子。
cogroup(otherDataset, [numTasks]) 在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, (Iterable, Iterable)) tuples 的 dataset。
cartesian(otherDataset) 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) 类型的 dataset(即笛卡尔积)。
coalesce(numPartitions) 将 RDD 中的分区数减少为 numPartitions。
repartition(numPartitions) 随机重新调整 RDD 中的数据以创建更多或更少的分区,并在它们之间进行平衡。
repartitionAndSortWithinPartitions(partitioner) 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并对分区中的数据按照 key 值进行排序。这比调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作所在的机器。

Spark 常用的 Action 算子

Action(动作) Meaning(含义)
reduce(func) 使用函数func执行归约操作
collect() 以一个 array 数组的形式返回 dataset 的所有元素,适用于小结果集。
count() 返回 dataset 中元素的个数。
first() 返回 dataset 中的第一个元素,等价于 take(1)。
take(n) 将数据集中的前 n 个元素作为一个 array 数组返回。
takeSample(withReplacement, num, [seed]) 对一个 dataset 进行随机抽样
takeOrdered(n, [ordering]) 按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 n 个元素。只适用于小结果集,因为所有数据都会被加载到驱动程序的内存中进行排序。
saveAsTextFile(path) 将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。
saveAsSequenceFile(path) 将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。该操作要求 RDD 中的元素需要实现 Hadoop 的 Writable 接口。对于 Scala 语言而言,它可以将 Spark 中的基本数据类型自动隐式转换为对应 Writable 类型。(目前仅支持 Java and Scala)
saveAsObjectFile(path) 使用 Java 序列化后存储,可以使用 SparkContext.objectFile() 进行加载。(目前仅支持 Java and Scala)
countByKey() 计算每个键出现的次数。
foreach(func) 遍历 RDD 中每个元素,并对其执行fun函数b

所有的转换都是懒惰(Lazy)操作,它们只是记住了需要这样的转换操作,并不会马上执行,只有等到 Actions 操作的时候才会真正启动计算过程进行计算。举个具体的例子,如图所示,

1600094733358

先经过转换 textFile 把数据从 HDFS 加载到 RDDA 以及 RDDC,这时其实RDD A 或者 RDD C 中都是没有数据的。再到后面的转换 flatMap、map、reduceByKey等,分别把 RDD A 转换为 RDD B 到 RDD F 以及 RDD C 到 RDD E等,这些转换都是没有执行的。可以理解为先做个计划,但是没有具体执行,等到执行操作saveAsSequenceFile时,才开始真正触发并执行任务。

宽依赖和窄依赖

RDD 和它的父 RDD(s) 之间的依赖关系分为两种不同的类型:

  • 窄依赖 (narrow dependency):父 RDDs 的一个分区最多被子 RDDs 一个分区所依赖;
  • 宽依赖 (wide dependency):父 RDDs 的一个分区可以被子 RDDs 的多个子分区所依赖。

如下图,每一个方框表示一个 RDD,带有颜色的矩形表示分区:

EMhehb

区分这两种依赖是非常有用的:

  • 首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)对父分区数据进行计算,例如先执行 map 操作,然后执行 filter 操作。而宽依赖则需要计算好所有父分区的数据,然后再在节点之间进行 Shuffle,这与 MapReduce 类似。
  • 窄依赖能够更有效地进行数据恢复,因为只需重新对丢失分区的父分区进行计算,且不同节点之间可以并行计算;而对于宽依赖而言,如果数据丢失,则需要对所有父分区数据进行计算并再次 Shuffle。

Stage

Spark 中还有一个重要的概念,即 Stage。一般而言,一个 Job 会分成一定数量的 Stage。各个 Stage 之间按照顺序执行。

那么 Siage 是怎么划分的?

在 Spark 中,一个 Job 会被拆分成多组 Task,每组任务就是一个 Stage。而 Spark 中有两类 Task,分别是 ShuffleMapTask和 ResultTask。ShuffleMapTask 的输出是 Shuffle 所需的数据,ResultTask 的输出是最终的结果。

因此 Stage 也以此为依据进行划分,简单地说,Stage 是以 Shuffle 和 Result 这两种类型划分的,Shuffle 之前的所有变换是一个 Stage,Shuftle 之后的操作是另一个 Stage。比如

1
rdd.parallize(1 to 10).foreach(println)

这个操作没有 Shufile,直接就输出了。它的 Task 只有一个,即 ResultTask,Stage 也只有一个。如果是

1
rdd.map(x=>(x, 1).reduceByKey(_ + _).foreach(println)

这个 Job 因为有 reduceByKey 操作,所以有一个 Shuffie 过程,那么 reduceBykey之前的是一个 Stage,执行 shuffleMapTask,输出 Shufle 所需的数据reduceByKey 到最后是一个 Stage,直接就输出结果了。如果 Job 中有多个 Shufle,那么每个 Shufle 之前都是一个 Stage。

Job划分

aKk0mk

如图所示,这是一个 Job 的划分过程。在图中,可以看到有 3 个阶段(Stage)分别是 Stage 1(RDD A )、Stage 2( RDD C、RDD D、RDD E、RDD F)、Stage 3(包含所有 RDD)Spark 会将每一个 Job 划分为多个不同的 Stage,而 Stage 之间的依赖关系则形成了有向无环图(DAG )。对于窄依赖,Spark 会尽量多地将 RDD 转换放在同一个阶段(Stage)中。而对于宽依赖,由于宽依赖通常意味着 Shuffle操作,因此Spark会将Shuffle操作定义为阶段( Stage)的边界。也就是说,Spark 遇到宽依赖就划分为一个 Stage,遇到窄依赖则将这个 RDD 加入到该 Stage 中。因此在图中,RDD C、RDD D、RDD E、RDD F 被构建在一个 Stage 中,RDD A 被构建在一个单独的 Stage 中,而 RDDB 和 RDDG 又被构建在同一个 Stage 中。

RDD调度运行流程

结合前面的介绍,针对 Spark的RDD调度运行流程简单解释如下。

1600096397045

如图所示,用户代码(如rdd1.join…)转换为有向无环图(DAG)后,交给 DAGScheduler,由它把 RDD 的有向无环图分割成各个 Stage 的有向无环图,形成 TaskSet,再提交给 TaskScheduler,由 TaskSeheduler 把任务(Task)提交给每个 Worker 上的 Executor,执行具体的 Task。在 TaskSeheduler 中,是不知道各个 Stage 的存在的,运行的只有 Task。

RDD内存管理(持久化)

Spark最重要的一个功能是它可以通过各种操作(operations)持久化(或者缓存)一个集合到内存中。当你持久化一个RDD的时候,每一个节点都将参与计算的所有分区数据存储到内存中,并且这些 数据可以被这个集合(以及这个集合衍生的其他集合)的动作(action)重复利用。这个能力使后续的动作速度更快(通常快10倍以上)。对应迭代算法和快速的交互使用来说,缓存是一个关键的工具。

你能通过persist()或者cache()方法持久化一个rdd。首先,在action中计算得到rdd;然后,将其保存在每个节点的内存中。Spark的缓存是一个容错的技术-如果RDD的任何一个分区丢失,它 可以通过原有的转换(transformations)操作自动的重复计算并且创建出这个分区。

此外,我们可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许我们持久化集合到磁盘上、将集合作为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到 Tachyon中。我们可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用了默认的存储级别—StorageLevel.MEMORY_ONLY。完整的存储级别介绍如下所示:

Storage Level Meaning
MEMORY_ONLY 将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,一些分区将不会被缓存,从而在每次需要这些分区时都需重新计算它们。这是系统默认的存储级别。
MEMORY_AND_DISK 将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,将这些不适合存在内存中的分区存储在磁盘中,每次需要时读出它们。
MEMORY_ONLY_SER 将RDD作为序列化的Java对象存储(每个分区一个byte数组)。这种方式比非序列化方式更节省空间,特别是用到快速的序列化工具时,但是会更耗费cpu资源—密集的读操作。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER类似,但不是在每次需要时重复计算这些不适合存储到内存中的分区,而是将这些分区存储到磁盘中。
DISK_ONLY 仅仅将RDD分区存储到磁盘中
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面的存储级别类似,但是复制每个分区到集群的两个节点上面
OFF_HEAP (experimental) 以序列化的格式存储RDD到Tachyon中。相对于MEMORY_ONLY_SER,OFF_HEAP减少了垃圾回收的花费,允许更小的执行者共享内存池。这使其在拥有大量内存的环境下或者多并发应用程序的环境中具有更强的吸引力。

NOTE:在python中,存储的对象都是通过Pickle库序列化了的,所以是否选择序列化等级并不重要。

Spark也会自动持久化一些shuffle操作(如reduceByKey)中的中间数据,即使用户没有调用persist方法。这样的好处是避免了在shuffle出错情况下,需要重复计算整个输入。如果用户计划重用 计算过程中产生的RDD,我们仍然推荐用户调用persist方法。

如何选择存储级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

  • 如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。

  • 如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。

  • 除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。

  • 如果你希望更快的错误恢复,可以利用重复(replicated)存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。

  • 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:

    • 它运行多个执行者共享Tachyon中相同的内存池

    • 它显著地减少垃圾回收的花费

    • 如果单个的执行者崩溃,缓存的数据不会丢失

回收策略

为了管理有限的内存资源,我们在 RDD的层面上采用 LRU(最近最少使用)回收策略。当一个新的 RDD 分区被计算但是没有足够的内存空间来存储这个分区的数据的时候,我们回收掉最近很少使用的 RDD 的分区数据的占用内存,如果这个 RDD 和这个新的计算分区的 RDD 时同一个 RDD 的时候,我们则不对这个分区数据占用的内存做回收。在这种情况下,我们将相同的 RDD 的老分区的数据保存在内存中是为了不让老是重新计算这些分区的数据,这点是非常重要的,因为很多操作都是对整个 RDD 的所有的 tasks 进行计算的,所以非常有必要将后续要用到的数据保存在内存中。到目前为止,我们发现这种默认的机制在所有的应用中工作的很好,但是我们还是将持久每一个 RDD 数据的策略的控制权交给用户。

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.xpleaf.bigdata.spark.scala.core.p3

import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
* Spark RDD的持久化
*/
object _01SparkPersistOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

var start = System.currentTimeMillis()
val linesRDD = sc.textFile("D:/data/spark/sequences.txt")
// linesRDD.cache()
// linesRDD.persist(StorageLevel.MEMORY_ONLY)

// 执行第一次RDD的计算
val retRDD = linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// retRDD.cache()
// retRDD.persist(StorageLevel.DISK_ONLY)
retRDD.count()
println("第一次计算消耗的时间:" + (System.currentTimeMillis() - start) + "ms")

// 执行第二次RDD的计算
start = System.currentTimeMillis()
// linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).count()
retRDD.count()
println("第二次计算消耗的时间:" + (System.currentTimeMillis() - start) + "ms")

// 持久化使用结束之后,要想卸载数据
// linesRDD.unpersist()

sc.stop()

}
}

检查点支持

虽然“血缘关系”可以用于错误后RDD的恢复,但是对于很长的“血缘关系”的RDD来说,这样的恢复耗时比较长,因此需要通过检查点操作(Checkpoint)保存到外部存储中。

通常情况下,对于包含宽依赖的长“血缘关系”的RDD设置检查点操作是非常有用的。在这种情况下,集群中某个节点出现故障时,会使得从各个父RDD计算出的数据丢失,造成需要重新计算。相反,对于那些窄依赖的RDD,对其进行检查点操作就不是有必须。在这种情况下如果一个节点发生故障,RDD在该节点中丢失的分区数据可以通过并行的方式从其他节点中重新计算出来,计算成本只是复制RDD的很小部分。

Spark提供为RDD设置检查点操作的API,可以让用户自行决定需要为那些数据设置检查点操作。另外由于RDD的只读特性,使得不需要关心数据一致性问题, 比常用的共享内存更容易做检查点。

多用户管理

RDD模型将计算分解为多个相互独立的细粒度任务,这使得它在多用户集群能够支持多种资源共享算法。特别地,每个RDD应用可以在执行过程中动态调整访问资源。

  • 在每个应用程序中, Spark 运行多线程同时提交作业,并通过一种等级 公平调度器来实现多个作业对集群资源的共享,这种调度器和Hadoop Fair Scheduler 类似。该算法主要用于创建基于针对相同内存数据的多用户应用,例如: Spark SQL引擎有一个服务模式支持多用户并行查询。公平调度算法确保短的作业能够在即使长作业占满集群资源的情况下尽早完成。
  • Spark的公平调度也使用延迟调度,通过轮询每台机器的数据,在保持公平的情况下给予作业高的本地性。Spark 支持多级本地化访问策略(本地化),包括内存、磁盘和机架。
  • 由于任务相互独立,调度器还支持取消作业来为高优先级的作业腾出资源。
  • Spark中可以使用Mesos来实现细粒度的资源共享,这使得Spark应用能相互之间或在不同的计算框架之间实现资源的动态共享。
  • Spark使用Sparrow系统扩展支持分布式调度,该调度允许多个Spark应用以去中心化的方式在同一集群上排队工作,同时提供数据本地性、低延迟和公平性。