一、简介
Spark 于 2009 年诞生于加州大学伯克利分校 AMPLab,2013 年被捐赠给 Apache 软件基金会,2014 年 2 月成为 Apache 的顶级项目。相对于 MapReduce 的批处理计算,Spark 可以带来上百倍的性能提升,因此它成为继 MapReduce 之后,最为广泛使用的分布式计算框架。
二、Spark产生背景
Spark 是在 MapReduce 的基础上产生的,借鉴了大量 MapReduce 实践经验,引入多种新型涉及思想和优化策略。针对MapReduce计算框架存在的局限性进行分析,能更好的了解到 Spark。
MapReduce 的局限性如下:
1、处理效率低效
- Map中间结果写磁盘,Reduce写HDFS,多个MR之间通过HDFS交换数据
- 任务调度和启动开销大
- 无法充分利用内存
- Map 端和 Reduce 端均需要排序
- 复杂功能 Io 开销大,对于复杂 sql,需转换 MapReduce 计算,需要通过 HDFS 进行磁盘数据交换,而读写Hfds需消耗大量磁盘和网络 IO
2、 不适合迭代计算(如机器学习、图计算等),交互式处理(数据挖掘) 和流式处理(点击日志分析)
3、 MapReduce 编程不够灵活
- 仅支持 Map 和 Reduce 两种操作
- 尝试函数式编程风格
4、计算框架多样化、无形中产生运维和管理成本
三、特点
Spark 是基与 MapReduce 基础产生了,克服了其存在的性能低下,变成不够灵活的缺点。
Spark 作为一种 DAG 计算框架,主要特点如下:
Apache Spark 具有以下特点:
- 使用先进的 DAG 调度程序,查询优化器和物理执行引擎,以实现性能上的保证;
- 多语言支持,目前支持的有 Java,Scala,Python 和 R;
- 提供了 80 多个高级 API,可以轻松地构建应用程序;
- 支持批处理,流处理和复杂的业务分析;
- 丰富的类库支持:包括 SQL,MLlib,GraphX 和 Spark Streaming 等库,并且可以将它们无缝地进行组合;
- 丰富的部署模式:支持本地模式和自带的集群模式,也支持在 Hadoop,Mesos,Kubernetes 上运行;
- 多数据源支持:支持访问 HDFS,Alluxio,Cassandra,HBase,Hive 以及数百个其他数据源中的数据。

三、Spark的组成
Spark组成(BDAS):全称伯克利数据分析栈,通过大规模集成算法、机器、人之间展现大数据应用的一个平台。也是处理大数据、云计算、通信的技术解决方案。
它的主要组件有:
- SparkCore:将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。
- SparkSQL:Spark Sql 是 Spark 来操作结构化数据的程序包,可以让我使用SQL语句的方式来查询数据,Spark支持 多种数据源,包含Hive表,parquest以及JSON等内容。
- SparkStreaming:是 Spark 提供的实时数据进行流式计算的组件。
- MLlib:提供常用机器学习算法的实现库。
- GraphX:提供一个分布式图计算框架,能高效进行图计算。
- BlinkDB:用于在海量数据上进行交互式SQL的近似查询引擎。
- Tachyon:以内存为中心高容错的的分布式文件系统。
四、集群架构
| Term(术语) | Meaning(含义) |
|---|---|
| Application | Spark 应用程序,由集群上的一个 Driver 节点和多个 Executor 节点组成。 |
| Driver program | 主运用程序,该进程运行 main() 方法并且创建 SparkContext |
| Cluster manager | 集群资源管理器(例如,Standlone Manager,Mesos,YARN) |
| Worker node | 执行计算任务的工作节点 |
| Executor | 位于工作节点上的应用进程,负责执行计算任务并且将输出数据保存到内存或者磁盘中 |
| Task | 被发送到 Executor 中的工作单元 |
| Job | 一个Action算子(比如collect算子)对应一个Job,由并行计算的多个Task组成。 |
| Stage | 每个Job由多个Stage组成,每个Stage是一个Task集合,由DAG分割而成。 |
| Task | 承载业务逻辑的运算单元,是Spark平台中可执行的最小工作单元。一个应用根据执行计划以及计算量分为多个Task。 |
Spark架构采用了分布式计算中的Master-Slave模型。Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。Master作为整个集群的控制器,负责整个集群的正常运行;Worker相当于是计算节点,接收主节点命令与进行状态汇报;Executor负责任务的执行;Client作为用户的客户端负责提交应用,Driver负责控制一个应用的执行。

执行过程:
- 用户程序创建 SparkContext 后,它会连接到集群资源管理器,集群资源管理器会为用户程序分配计算资源,并启动 Executor;
- Driver 将计算程序划分为不同的执行阶段和多个
task,之后将task发送给 Executor; - Executor 负责执行 Task,并将执行状态汇报给 Driver,同时也会将当前节点资源的使用情况汇报给集群资源管理器。
- 因为Driver调度了集群上的
task(任务),更好的方式应该是在相同的局域网中靠近worker的节点上运行。如果你不喜欢发送请求到远程的集群,倒不如打开一个RPC至driver并让它就近提交操作而不是从很远的worker节点上运行一个driver。
driver做什么
- 运行应用程序的main函数
- 创建spark的上下文
- 划分RDD并生成有向无环图(DAGScheduler)
- 与spark中的其他组进行协调,协调资源等等(SchedulerBackend)
- 生成并发送task到executor(taskScheduler)
五、核心组件
Spark 基于 Spark Core 扩展了四个核心组件,分别用于满足不同领域的计算需求。

Spark SQL
Spark SQL 主要用于结构化数据的处理。其具有以下特点:
- 能够将 SQL 查询与 Spark 程序无缝混合,允许您使用 SQL 或 DataFrame API 对结构化数据进行查询;
- 支持多种数据源,包括 Hive,Avro,Parquet,ORC,JSON 和 JDBC;
- 支持 HiveQL 语法以及用户自定义函数 (UDF),允许你访问现有的 Hive 仓库;
- 支持标准的 JDBC 和 ODBC 连接;
- 支持优化器,列式存储和代码生成等特性,以提高查询效率。
Spark Streaming
Spark Streaming 主要用于快速构建可扩展,高吞吐量,高容错的流处理程序。支持从 HDFS,Flume,Kafka,Twitter 和 ZeroMQ 读取数据,并进行处理。

Spark Streaming 的本质是微批处理,它将数据流进行极小粒度的拆分,拆分为多个批处理,从而达到接近于流处理的效果。

MLlib
MLlib 是 Spark 的机器学习库。其设计目标是使得机器学习变得简单且可扩展。它提供了以下工具:
- 常见的机器学习算法:如分类,回归,聚类和协同过滤;
- 特征化:特征提取,转换,降维和选择;
- 管道:用于构建,评估和调整 ML 管道的工具;
- 持久性:保存和加载算法,模型,管道数据;
- 实用工具:线性代数,统计,数据处理等。
Graphx
GraphX 是 Spark 中用于图形计算和图形并行计算的新组件。在高层次上,GraphX 通过引入一个新的图形抽象来扩展 RDD(一种具有附加到每个顶点和边缘的属性的定向多重图形)。为了支持图计算,GraphX 提供了一组基本运算符(如: subgraph,joinVertices 和 aggregateMessages)以及优化后的 Pregel API。此外,GraphX 还包括越来越多的图形算法和构建器,以简化图形分析任务。
六、spark作业运行流程
Spark 有3 种运行模式,包括 Standalone、YARN 和 Mesos,其中,Mesos 和 YARN模式类似。目前用得比较多的是 Standalone 模式和 YARN 模式。下面将详细介绍 Standalone模式和 YARN 模式的启动方式及运行流程。
Standalone模式
Standalone模式是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。Driver既可以运行在Master节点上,也可以运行在本地Client端。
当以 Standalone 模式向 Spark 集群提交作业时,作业的运行流程如图所示。

(1)首先,SparkContext 连接到 Master,向 Master 注册并申请资源。
(2)Worker 定期发送心跳信息给 Master 并报告 Executor 状态。
(3)Master 根据 SparkContext 的资源申请要求和 Worker 心跳周期内报告的信息决定在哪个 Worker 上分配资源,然后在该 Worker 上获取资源,启动 StandaloneExecutorBackend。
(4)StandaloneExecutorBackend向 SparkContext注册。
(5)SparkContext 将 Application 代码发送给 StandaloneExecutorBackend,并且 SparkContext 解析 Application 代码,构建 DAG 图,并提交给 DAG Scheduler,分解成 Stage(当碰到 Action 操作时,就会催生 Job,每个 Job 中含有一个或多个 Stage),然后将 Stage(或者称为 TaskSet)提交给 Task Scheduler,Task Scheduler 负责将Task 分配到相应的 Worker,最后提交给 StandaloneExecutorBackend 执行。
(6)StandaloneExecutorBackend 会建立 Executor 线程池,开始执行 Task,并向 SparkContext 报告,直至 Task 完成。
(7)所有 Task 完成后,SparkContext 向 Master 注销,释放资源。
Standalone-Client(Driver在client运行)
提交命令:
1 | spark-submit --master spark://172.18.0.2:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi /spark/examples/jars/spark-examples_2.11-2.3.3.jar 10000 |

(1)Client模式下提交任务,在客户端启动Driver进程。
(2)Driver会向Master申请启动Application启动的资源。
(3)资源申请成功,Driver端将Task发送到Worker端执行。
(4)Worker将Task执行结果返回到Driver端。
work端进程

master端进程

client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。生产环境下不能使用client模式,因为Driver可能会回收task执行结果数据,假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端所在节点的Driver收集100个application的结果数据,导致100次网卡流量暴增的问题。
Standalone-Cluster(Driver在Worker运行)
提交命令:
1 | spark-submit --master spark://172.18.0.2:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi /spark/examples/jars/spark-examples_2.11-2.3.3.jar 10000 |

(1)Standalone-Cluster模式提交App后,会向Master请求启动Driver。
(2)Master接收请求之后,随机在集群中一台节点启动Driver进程。
(3)Driver启动后为当前的应用程序申请资源。
(4)Driver端发送task到worker节点上执行。
(5)worker将执行情况和执行结果返回给Driver端。
work端进程

master端进程

Driver进程是在集群某一台Worker上启动的,在客户端是无法查看task的执行情况的。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散布在集群上。
Yarn模式
Yarn-cluster
提交命令:
1 | spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /usr/hdp/3.0.1.0-187/spark2/examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar |
在集群模式下,Driver 运行在 Application Master 上,Application Master 进程同时负责驱动 Application 和从 YARN 中申请资源。该进程运行在 YARN Container 内,所以启动Application Master 的 Client 可以立即关闭,而不必持续到 Application 的声明周期。

(1)Client(客户端)生成作业信息提交给 ResourceManager。
(2)ResourceManager 在某一个 NodeManager (由 YARN 决定)启动 Container,并将Application Master 分配给该 NodeManager。
(3)NodeManager 接收到 ResourceManager 的分配,启动 Application Master 并初始化作业,此时 NodeManager 就称为 Driver。
(4)Application 向 ResourceManager 申请资源,ResourceManager 分配资源的同时通知其他 NodeManager 启动相应的 Executor。
(5)Executfor 向 NodeManager 上的 Application Master 注册汇报并完成相应的任务。图 1-19 是 YARN 客户端模式的作业运行流程。Application Master 仅仅从 YARN 中申请资源给 Executor,之后 Client 会与 Container通信进行作业的调度。
进程

应用的运行结果不能在客户端显示(可以在 history server 中查看),所以最好将结果保存在 HDFS 而非 stdout 输出,客户端的终端显示的是作为 Yarn 的 job 的简单运行状况。在此模式下,Driver运行在AM(ApplicationMaster)里,可以理解为AM包括了Driver的功能就像Driver运行在AM里一样,此时的AM既能够向AM申请资源并进行分配,又能完成Driver划分RDD提交task等工作。
Yarn-client
提交命令:
1 | spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi /usr/hdp/3.0.1.0-187/spark2/examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar |
下图是Yarn-client模式的作业运行流程。Application Master 仅仅从 YARN 中申请资源给 Etecutia,之后 Client会与 Container 通信进行作业的调度。

YARN-Client 模式的作业运行流程描述如下。
(1)客户端生成作业信息提交给 ResourceManager。
(2)ResourceManager 在本地 NodeManager启动 Container,并将 Application Master分配给该 NodeManager。
(3)NodeManager 接收到 ResourceManager 的分配,启动 Application Master 并初始化作业,此时这个 NodeManager 就称为 Driver。
(4)Application 向 ResourceManager 申请资源,ResourceManager 分配资源同时通知其他 NodeMamager 启动相应的 Executor。
(5)Executor 向本地启动的 Application Master 注册汇报并完成相应的任务。

Driver运行在客户端上,先有driver再用AM,此时driver负责RDD生成、task生成和分发,向AM申请资源等 ,AM负责向RM申请资源,其他的都由driver来完成。
从上面两张图可看出 YARN-Cluster 和 YARN-Client 的区别。在 YARN-Cluster模式下,SparkDriver 运行在 Application Master(AM)中,它负责向 YARN 申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉 Client,作业会继续在 YARN上运行,所以 YARN-Cluster 模式不适合运行交互类型的作业。然而在 YARN-Client 模式下,AM 仅仅向 YARN 请求 Executor,Client 会与请求得到的 Container 通信来调度它们工作,也就是说 Client 不能离开。
总结起来就是,集群模式的 Spark Driver 运行在 AM 中,而客户端模式的 Spark Driven运行在客户端。所以,YARN-Cluster 适用于生产,而 YARN-Client 适用于交互和调试,也就是希望快速地看到应用的输出信息。