Roger's Blog

  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 公益 404

  • 搜索

DataSphere Studio1.0本地调试开发指南

发表于 2021-09-14 更新于 2021-09-16 分类于 大数据

DataSphere Studio1.0本地调试开发指南

本指南仅做本地调试开发参考,不做线上部署参考,修改内容后可能会存在打包问题(未做测试)

数据库准备

新建数据库dds,前后执行db目录下的dss_ddl.sql、dss_dml.sql脚本文件

Maven构建打包获取dss-appconns下面相关的lib包便于后面调试

如果您是本地第一次使用,必须在最外层工程pom.xml所在目录先执行以下命令:

1
mvn -N  install

在最外层工程pom.xml所在目录执行以下命令

1
mvn clean install

获取安装包,在工程的assembly->target目录下:

1
wedatasphere-dss-x.x.x-dist.tar.gz

解压安装包到指定目录,得到安装包内dss-appconns目录的路径,例如:

/Users/roger/Downloads/wedatasphere-dss-1.0.0-dist/dss-1.0.0/dss-appconns

配置文件修改

把conf目录下的通用配置文件application-dss.yml、log4j2.xml、dss.properties、token.properties、log4j.properties5个人配置问价分别拷贝到dss-apps/dss-apiservice-server、dss-apps/dss-datapipe-server、dss-framewor/dss-framework-orchestrator-server、dss-framewor/dss-framework-project-server、dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server、dss-orchestrator/orchestrators/dss-workflow/dss-workflow-server模块的Resources目录下面,需要自己新建Resources目录,然后再把剩余的每个模块所对应的文件分别拷贝到每个模块所对应的Resources目录

image-20210916092201536

并设置标记好为resource目录

image-20210914203153198

修改每个模块中配置文件中注册中心、数据库、服务名等配置信息(PS:也可以先修改后拷贝到每个模块)

application-dss.yml:

nD9jDu

dss.properties:

0pjAya

如果你没配置当前dss运行的环境是开发环境还是生产环境的话,默认是DEV,需要修改两个服务名,一遍出现网关无法通过服务名去转发请求的情况

修改dss-framework-orchestrator-server.properties的spring.spring.application.name的值为dss-framework-orchestrator-server-dev

wvm2Y5

修改dss-workflow-server.properties的spring.spring.application.name的值为dss-workflow-server-dev

xXBXXs

修改POM文件

  • 修改dss-apps/dss-apiservice-server的pom.xml避免运行中可能会出现classNotFound的错误:

增加以下内容:

1
2
3
4
5
6
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.26</version>
<scope>compile</scope>
</dependency>
  • 修改dss-framewor/dss-framework-orchestrator-server的pom.xml避免运行中可能会出现classNotFound的错误:

增加以下内容:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.8.3</version>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.dss</groupId>
<artifactId>dss-sender-service</artifactId>
<version>${dss.version}</version>
<scope>provided</scope>
</dependency>

修改以下内容:

旧:

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-rpc</artifactId>
<version>${linkis.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<groupId>org.springframework.cloud</groupId>
</exclusion>
</exclusions>
</dependency>

新:

1
2
3
4
5
6
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-rpc</artifactId>
<version>${linkis.version}</version>
<scope>provided</scope>
</dependency>
  • 修改dss-framewor/dss-framework-project-server的pom.xml避免运行中可能会出现classNotFound的错误:

增加以下内容:

1
2
3
4
5
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.8.3</version>
</dependency>
  • 修改dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server的pom.xml避免运行中可能会出现classNotFound的错误:

修改以下内容:

旧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-rpc</artifactId>
<version>${linkis.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</exclusion>
<exclusion>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.hk2.external</groupId>
<artifactId>bean-validator</artifactId>
</exclusion>
</exclusions>
</dependency>

新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-rpc</artifactId>
<version>${linkis.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.hk2.external</groupId>
<artifactId>bean-validator</artifactId>
</exclusion>
</exclusions>
</dependency>
  • 修改dss-orchestrator/orchestrators/dss-workflow/dss-workflow-server的pom.xml避免运行中可能会出现classNotFound的错误:

增加以下内容:

1
2
3
4
5
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.8.3</version>
</dependency>

修改以下内容:

旧:

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-rpc</artifactId>
<version>${linkis.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<groupId>org.springframework.cloud</groupId>
</exclusion>
</exclusions>
</dependency>

新:

1
2
3
4
5
6
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-rpc</artifactId>
<version>${linkis.version}</version>
<scope>provided</scope>
</dependency>

修改代码避免创建工作流出现appconn空指针异常

修改DefaultLinkedAppConnResolver.java这个类:

旧:

1
2
3
4
5
6
7
8
9
10
11
@Override
public List<AppConn> resolveAppConnByUser(String userName, String workspaceName, String typeName) {
//todo 后面可以使用数据库表来定义用户可以加载的AppConn.
List<AppConn> appConns = new ArrayList<>();
for(AppConn appConn : AppConnManager.getAppConnManager().listAppConns()){
//可以在这里根据用户情况和工作空间情况,限制appConn的加载
appConns.add(appConn);
}

return appConns;
}

新:

1
2
3
4
5
6
7
@Override
public List<AppConn> resolveAppConnByUser(String userName, String workspaceName, String typeName) {
//todo 后面可以使用数据库表来定义用户可以加载的AppConn.
List<AppConn> appConns = new ArrayList<>(AppConnManager.getAppConnManager().listAppConns());

return appConns;
}

至于为啥这里会出现空指针,还有待研究

配置IDEA启动

IDEA新增六个Application,对应前面的六个模块,运行主类都在每个模块的scala目录下面,记得勾选scope dependence

image-20210916121948907

image-20210916102002420

DSSOrchestratorServerApplication需要设置VM运行参数

1
2
-DserviceName=dss-framework-orchestrator-server
-DDSS_HOME=/Users/roger/Downloads/wedatasphere-dss-1.0.0-dist/dss-1.0.0

DSSProjectServerApplication需要设置VM运行参数

1
2
3
-DserviceName=dss-framework-project-server
-DDSS_HOME=/Users/roger/Downloads/wedatasphere-dss-1.0.0-dist/dss-1.0.0
-DDSS_INSTALL_HOME_VAL=/Users/roger/Downloads/wedatasphere-dss-1.0.0-dist/dss-1.0.0

DSSDatapipeServerApplication需要设置VM运行参数

1
-DserviceName=dss-datapipe-server

DSSWorkflowServerApplication需要设置VM运行参数

1
-DserviceName=dss-workflow-server

DSSApiServiceServerApplication需要设置VM运行参数

1
-DserviceName=dss-apiservice-server

DSSFowExecutionServerApplication需要设置VM运行参数

1
-DserviceName=dss-flow-execution-server

JVM参数解析:

-DserviceName这个参数主要用于服务查找对应的properties配置文件

-DDSS_HOME和-DDSS_INSTALL_HOME_VAL这个参数主要用于在发现appconns插件依赖相关的lib包(本文步骤二的解压路径),并且在DSSProjectServerApplication项目启动初始化的时候会上传到hadoop的hdfs文件系统中,便于后面的流程使用

数据库修改

在项目启动后的一些使用过程中,可能会出现一些appconn lib无法找到导致的异常,需要去手动修改一下数据库

修改dss_appconn这张表的appconn_class_path字段的内容,修改为你本地存储这些lib的路径(本文步骤二的解压路径),例如:

Gh05hg

前端本地调试设置

修改web文件下下面的vue.config.js文件,修改devServe下面的内容如下:

修改target的内容为你网关的地址,例如:

OzQncL

其他问题

spark提交任务的时候会读取当前系统用户,如果是分布式部署dss和linkis的话,就会出现linkis所在的机器没有dss所在机器的用户,就会出现Permission denied: user=Administrator, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x的异常,可以在linkis所在的服务器上执行以下命令即可解决:

1
2
3
adduser Administrator
groupadd supergroup
usermod -a -G supergroup Administrator

Maven使用教程

发表于 2021-03-07 更新于 2021-03-08 分类于 spring boot

Maven介绍

Maven 翻译为”专家”、”内行”,是 Apache 下的一个纯 Java 开发的开源项目。基于项目对象模型(缩写:POM)概念,Maven利用一个中央信息片断能管理一个项目的构建、报告和文档等步骤。

Maven 是一个项目管理工具,可以对 Java 项目进行构建、依赖管理。

Maven 也可被用于构建和管理各种项目,例如 C#,Ruby,Scala 和其他语言编写的项目。Maven 曾是 Jakarta 项目的子项目,现为由 Apache 软件基金会主持的独立 Apache 项目。

Maven POM

POM( Project Object Model,项目对象模型 ) 是 Maven 工程的基本工作单元,是一个XML文件,包含了项目的基本信息,用于描述项目如何构建,声明项目依赖,等等。

执行任务或目标时,Maven 会在当前目录中查找 POM。它读取 POM,获取所需的配置信息,然后执行目标。

POM 中可以指定以下配置:

  • 项目依赖

  • 插件

  • 执行目标

  • 项目构建 profile

  • 项目版本

  • 项目开发者列表

  • 相关邮件列表信息

所有 POM 文件都需要 project 元素和三个必需字段:groupId,artifactId,version。

节点 描述
modelVersion 模型版本
groupId 这是工程组的标识。它在一个组织或者项目中通常是唯一的
artifactId 这是工程的标识。它通常是工程的名称
version 这是工程的版本号。在 artifact 的仓库中,它用来区分不同的版本

父(Super)POM

在 Java 中,所有的类都继承自 Object 超类,而在 maven 中也存在 The Super POM,所有 maven 工程的pom 文件都继承自该文件,那这个 The Super POM 中设置了什么呢?

下面展示的 maven 3.5.4 版本的 The Super POM。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<project>
<modelVersion>4.0.0</modelVersion>

<repositories>
<repository>...</repository>
</repositories>
<pluginRepositories>
<pluginRepository>...</pluginRepository>
</pluginRepositories>

<build>
<directory>${project.basedir}/target</directory>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<finalName>${project.artifactId}-${project.version}</finalName>
<testOutputDirectory>${project.build.directory}/test-classes</testOutputDirectory>
<sourceDirectory>${project.basedir}/src/main/java</sourceDirectory>
<scriptSourceDirectory>${project.basedir}/src/main/scripts</scriptSourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/java</testSourceDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<testResources>
<testResource>
<directory>${project.basedir}/src/test/resources</directory>
</testResource>
</testResources>
<pluginManagement>
<!-- NOTE: These plugins will be removed from future versions of the super POM -->
<!-- They are kept for the moment as they are very unlikely to conflict with lifecycle mappings (MNG-4453) -->
<plugins>
...
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
</plugin>
...
</plugins>
</pluginManagement>
</build>

<reporting>
<outputDirectory>${project.build.directory}/site</outputDirectory>
</reporting>

<profiles>...</profiles>

</project>

我对上面的 pom 文件进行了删减,仅展示了部分内容,它们可以概括为以下 3 个部分:

  • 默认的 repository

maven 默认使用官方提供的 central repository,这也是你可以开箱即用的原因,而在安装阶段配置国内的镜像源就是为了替换默认的设置。

  • 标准的目录骨架

在 build 标签中,可以看到如 sourceDirectory 、 testSourceDirectory 、 outputDirectory 等标签,这些标签为我们设置了默认的 maven 目录骨架,明确了源代码目录、测试目录、输出目录等各种目录,使得我们在接触一个任何一个 maven 工程时都可以快速找到相对应的代码入口。

这也提示我们,其实我们可以自定义 maven 目录结构,但完全不推荐这么做。

  • 默认的插件版本

在 The Super POM 中还定义了多个默认插件的版本,使得你省去了配置插件的工作,当然你也可以自己设置插件的版本号。

Maven 使用 effective pom(Super pom 加上工程自己的配置)来执行相关的目标,它帮助开发者在 pom.xml 中做尽可能少的配置,当然这些配置可以被重写。

使用以下命令来查看 Super POM 默认配置:

1
mvn help:effective-pom

Effective POM 的结果是经过继承、插值之后,使配置生效。

常用POM标签解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<!--声明项目描述符遵循哪一个POM模型版本。模型本身的版本很少改变,虽然如此,但它仍然是必不可少的,这是为了当Maven引入了新的特性或者其他模型变更的时候,确保稳定性。 -->
<modelVersion>4.0.0</modelVersion>

<!--项目的全球唯一标识符,通常使用全限定的包名区分该项目和其他项目。并且构建时生成的路径也是由此生成, 如com.mycompany.app生成的相对路径为:/com/mycompany/app -->
<groupId>org.example</groupId>
<!-- 构件的标识符,它和group ID一起唯一标识一个构件。换句话说,你不能有两个不同的项目拥有同样的artifact ID和groupID;在某个
特定的group ID下,artifact ID也必须是唯一的。构件是项目产生的或使用的一个东西,Maven为项目产生的构件包括:JARs,源 码,二进制发布和WARs等。 -->
<artifactId>maven_test</artifactId>
<!--项目产生的构件类型,例如jar、war、ear、pom。插件可以创建他们自己的构件类型,所以前面列的不是全部构件类型 -->
<packaging>jar</packaging>
<!--项目当前版本,格式为:主版本.次版本.增量版本-限定版本号 -->
<version>1.0-SNAPSHOT</version>

<!--项目的名称, Maven产生的文档用 -->
<name>maven-test</name>

<!--&lt;!&ndash;描述了这个项目构建环境中的前提条件。 &ndash;&gt;
<prerequisites>
&lt;!&ndash;构建该项目或使用该插件所需要的Maven的最低版本 &ndash;&gt;
<maven />
</prerequisites>-->

<!--以值替代名称,Properties可以在整个POM中使用,也可以作为触发条件(见settings.xml配置文件里activation元素的说明)。格式是<name>value</name>。 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<log4j2-version>2.13.3</log4j2-version>
<camel.version>3.8.0</camel.version>
<camel-main.version>3.8.0</camel-main.version>
</properties>

<!-- 继承自该项目的所有子项目的默认依赖信息。这部分的依赖信息不会被立即解析,而是当子项目声明一个依赖(必须描述group ID和 artifact
ID信息),如果group ID和artifact ID以外的一些信息没有描述,则通过group ID和artifact ID 匹配到这里的依赖,并使用这里的依赖信息。 -->
<dependencyManagement>
<!--该元素描述了项目相关的所有依赖。 这些依赖组成了项目构建过程中的一个个环节。它们自动从项目定义的仓库中下载。要获取更多信息,请看项目依赖机制。 -->
<dependencies>
<!-- Camel BOM -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-bom</artifactId>
<version>3.8.0</version>
<!--依赖范围。在项目发布过程中,帮助决定哪些构件被包括进来。欲知详情请参考依赖机制。 - compile :默认范围,用于编译 - provided:类似于编译,但支持你期待jdk或者容器提供,类似于classpath
- runtime: 在执行时需要使用 - test: 用于test任务时使用 - system: 需要外在提供相应的元素。通过systemPath来取得
- systemPath: 仅用于范围为system。提供相应的路径 - optional: 当项目自身被依赖时,标注依赖是否传递。用于连续依赖时使用
- import: 这个作用域只支持<dependencyManagement>部分中的pom类型的依赖关系,它表示该依赖关系要被指定的POM的<dependencyManagement>部分中的有效依赖列表所取代。
它表示该依赖关系将被指定的 POM 的 <dependencyManagement> 部分中的有效依赖列表所替换。
由于它们被替换,所以具有导入范围的依赖关系实际上并不参与限制依赖关系的转义。-->
<scope>import</scope>
<!-- 依赖类型,默认类型是jar。它通常表示依赖的文件的扩展名,但也有例外。一个类型可以被映射成另外一个扩展名或分类器。类型经常和使用的打包方式对应,
尽管这也有例外。一些类型的例子:jar,war,ejb-client和test-jar。如果设置extensions为 true,就可以在 plugin里定义新的类型。所以前面的类型的例子不完整。 -->
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-main</artifactId>
<version>${camel-main.version}</version>
</dependency>

<!-- logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>runtime</scope>
<version>${log4j2-version}</version>
</dependency>

<!-- testing -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<!--构建项目需要的信息 -->
<build>
<defaultGoal>install</defaultGoal>

<!--使用的插件列表 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<!--作为DOM对象的配置 -->
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>

<!-- Allows the example to be run via 'mvn camel:run' -->
<plugin>
<groupId>org.apache.camel</groupId>
<artifactId>camel-maven-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<logClasspath>true</logClasspath>
<mainClass>org.example.MainApp</mainClass>
</configuration>
</plugin>

</plugins>
</build>

<!--发现依赖和扩展的远程仓库列表。 -->
<repositories>
<!--包含需要连接到远程仓库的信息 -->
<repository>
<!--远程仓库唯一标识符。可以用来匹配在settings.xml文件里配置的远程仓库 -->
<id>maven-ali</id>
<!--远程仓库名称 -->
<name>aliyun-repo</name>
<!--远程仓库URL,按protocol://hostname/path形式 -->
<url>https://maven.aliyun.com/repository/public</url>
<!--如何处理远程仓库里发布版本的下载 -->
<releases>
<!--true或者false表示该仓库是否为下载某种类型构件(发布版,快照版)开启。 -->
<enabled>true</enabled>
</releases>
<!-- 如何处理远程仓库里快照版本的下载。有了releases和snapshots这两组配置,POM就可以在每个单独的仓库中,为每种类型的构件采取不同的
策略。例如,可能有人会决定只为开发目的开启对快照版本下载的支持。参见repositories/repository/releases元素 -->
<snapshots>
<enabled>true</enabled>
<!--该元素指定更新发生的频率。Maven会比较本地POM和远程POM的时间戳。这里的选项是:always(一直),daily(默认,每日),interval:X(这里X是以分钟为单位的时间间隔),或者never(从不)。 -->
<updatePolicy>always</updatePolicy>
<!--当Maven验证构件校验文件失败时该怎么做:ignore(忽略),fail(失败),或者warn(警告)。 -->
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>

<!--项目分发信息,在执行mvn deploy后表示要发布的位置。有了这些信息就可以把网站部署到远程服务器或者把构件部署到远程仓库。 -->
<distributionManagement>
<!--部署项目产生的构件到远程仓库需要的信息 -->
<repository>
<id>roger-public</id>
<name>Roger-maven3</name>
<url>http://127.0.0.1:8081/repository/maven-releases/</url>
</repository>
<!--构件的快照部署到哪里?如果没有配置该元素,默认部署到repository元素配置的仓库,参见distributionManagement/repository元素 -->
<snapshotRepository>
<id>roger-public</id>
<name>Roger-maven3 Snapshot Repository</name>
<url>http://127.0.0.1:8081/repository/maven-snapshots/</url>
</snapshotRepository>
</distributionManagement>


</project>

Maven生命周期

Maven 构建生命周期定义了一个项目构建跟发布的过程。

一个典型的 Maven 构建(build)生命周期是由以下几个阶段的序列组成的:

img

阶段 处理 描述
validate 验证项目 验证项目是否正确且所有必须信息是可用的
compile 执行编译 源代码编译在此阶段完成
test 测试 使用适当的单元测试框架(例如JUnit)运行测试。
package 打包 创建JAR/WAR包如在 pom.xml 中定义提及的包
verify 检查 对集成测试的结果进行检查,以保证质量达标
install 安装 安装打包的项目到本地仓库,以供其他项目使用
deploy 部署 拷贝最终的工程包到远程仓库中,以共享给其他开发人员和工程

maven 定义了三种构建**生命周期(Build Lifecycle),分别是 **clean、default、site,它们分别用于不同的任务:

  • clean 生命周期:处理工程构建前的清理工作;
  • default 生命周期:处理工程的编译、打包、部署;
  • site 生命周期:生成工程的文档。

每一种构建生命周期都由多个构建阶段(Build Phase)组成,如 clean 生命周期就由 pre-clean、clean、post-clean 三个构建阶段组成,而 default 声明周期则有 23 个构建阶段,详细内容可查看官方文档。

maven 插件

虽然 maven 定义了构建生命周期和构建阶段,并指明了每个阶段的任务目标,但这些目标都是由 maven 插件(plugin)实现的,可以将构建阶段类比为一个 Java 接口,而将插件类比为一个 Java 实现类。

maven 插件有一个很重要的概念:goal,你可以直译为“目标”,但整个知识库仍直接使用英语原文。一个插件可以有多个 goal,一个 goal 对应一个功能,比如 maven-compiler-plugin 插件,就有两个 goal: compile 和 testCompile ,前者表示编译工程代码,后者表示编译工程测试代码。

内置生命周期绑定

“生命周期绑定”指的是将“生命周期的构建阶段”与“plugin:goal”绑定,而“内置(build-in)”指的是这是 maven 默认的。需要特别指出的是:不是所有构建阶段都(默认)绑定了 plugin:goal。

下面列出了几个常见的生命周期绑定,详细内容请查阅官方文档:

iXmrpw

构建阶段的有序性

使用 mvn compile 命令编译代码时的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ mvn compile
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< com.example:first-maven-project >-------------------
[INFO] Building first-maven-project 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ first-maven-project ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 0 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ first-maven-project ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 1 source file to /home/liming/Program/maven-tutorial/build-first-maven-project/target/classes
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.006 s
[INFO] Finished at: 2020-10-31T09:40:56+08:00
[INFO] ------------------------------------------------------------------------

首先, mvn compile 命令表示执行(default 生命周期的)compile 阶段,而 compile 阶段默认绑定了 maven-compiler-plugin:compile ,所以 mvn compile 命令就相当于执行 mvn compiler:compile 命令,因此在第 12 行你看到了对应的提示。

那为什么在执行 maven-compiler-plugin:compile 前会先执行 maven-resources-plugin:resources 呢?这就与“构建阶段的有序性”有关了。

“构建阶段的有序性”是指在同一种生命周期内,不同构建阶段的执行是有序的,执行某个阶段前,必须先执行完该阶段的所有前置阶段。如 compile 阶段属于 default 生命周期,而 process-resources 阶段在 compile 阶段之前,且默认绑定了 maven-resources-plugin:resources ,所以你看到第 8 行的提示。

前面说过,maven 并未对所有阶段都绑定 plugin:goal,所以即使构建阶段有序,有些阶段也可能并未真正执行。

至此,构建第一个 maven 工程时留下的疑惑就只剩下第 9 行和第 14 行的警告了,这个问题将在后文进行解答。

Java大数据开发入门系列(四)————Spark之RDD

发表于 2020-09-14 更新于 2020-09-15 分类于 大数据

RDD简介

RDD 全称为 Resilient Distributed Datasets,是 Spark 最基本的数据抽象。可以简单地把 RDD 理解成一个提供了许多操作接口的数据集合。和一般数据集不同的是,其实际数据被划分为一到多个分区,所有介区数据分布存储于一批机器中(内存或磁盘中),这里的分区可以简单地和 Hadoop HDFS 里面的文件块来对比理解。如图所示:

1600093597352

定义了一个名字为“myRDD”的 RDD 数据集,这个数据集被切分成了多个分区(Partion,可以对比 HDFS 的 Block 的概念来理解),可能每个分区实际存储在不同的机器上,同时也可能存储在内存(Memory)或硬盘上(HDFS,当然也可能是其他分布式文件系统)。

RDD是只读的、分区记录的集合,支持并行操作,可以由外部数据集或其他 RDD 转换而来,它具有以下特性:

  • 一个 RDD 由一个或者多个分区(Partitions)组成。对于 RDD 来说,每个分区会被一个计算任务所处理,用户可以在创建 RDD 时指定其分区个数,如果没有指定,则默认采用程序所分配到的 CPU 的核心数;
  • RDD 拥有一个用于计算分区的函数 compute;
  • RDD 会保存彼此间的依赖关系,RDD 的每次转换都会生成一个新的依赖关系,这种 RDD 之间的依赖关系就像流水线一样。在部分分区数据丢失后,可以通过这种依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算;
  • Key-Value 型的 RDD 还拥有 Partitioner(分区器),用于决定数据被存储在哪个分区中,目前 Spark 中支持 HashPartitioner(按照哈希分区) 和 RangeParationer(按照范围进行分区);
  • 一个优先位置列表 (可选),用于存储每个分区的优先位置 (prefered location)。对于一个 HDFS 文件来说,这个列表保存的就是每个分区所在的块的位置,按照“移动数据不如移动计算“的理念,Spark 在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理数据块的存储位置。

RDD两大类操作(spark 常用的算子)

RDD主要有两大类操作,分别为转换操作(Tiansformations)和行动操作(Actions))。转换操作主要是指把原始数据集加载到 RDD 以及把一个 RDD 转换为另外一个 RDD,而行动操作主要指把RDD 存储到硬盘或触发转换执行。例如,map 是一个 Transformation 操作,该操作作用于数据集上的每一个元素,并且返回一个新的 RDD 作为结果。而 reduce 是一个 Action 操作,该操作通过一些函数聚合 RDD 中的所有元素并且返回最终的结果给 Driver。

spark 常用的 Transformation 算子

Transformation 算子 Meaning(含义)
map(func) 对原 RDD 中每个元素运用 func 函数,并生成新的 RDD
filter(func) 对原 RDD 中每个元素使用func 函数进行过滤,并生成新的 RDD
flatMap(func) 与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items( func 返回类型需要为 Seq )。
mapPartitions(func) 与 map 类似,但函数单独在 RDD 的每个分区上运行, func函数的类型为 Iterator => Iterator ,其中 T 是 RDD 的类型,即 RDD[T]
mapPartitionsWithIndex(func) 与 mapPartitions 类似,但 func 类型为 (Int, Iterator) => Iterator ,其中第一个参数为分区索引
sample(withReplacement, fraction, seed) 数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed);
union(otherDataset) 合并两个 RDD
intersection(otherDataset) 求两个 RDD 的交集
distinct([numTasks])) 去重
groupByKey([numTasks]) 按照 key 值进行分区,即在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, Iterable) Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 性能会更好 Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传入 numTasks 参数进行修改。
reduceByKey(func, [numTasks]) 按照 key 值进行分组,并对分组后的数据执行归约操作。
aggregateByKey(zeroValue,numPartitions)(seqOp, combOp, [numTasks]) 当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和 zeroValue 聚合每个键的值。与 groupByKey 类似,reduce 任务的数量可通过第二个参数进行配置。
sortByKey([ascending], [numTasks]) 按照 key 进行排序,其中的 key 需要实现 Ordered 特质,即可比较
join(otherDataset, [numTasks]) 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,等价于内连接操作。如果想要执行外连接,可以使用 leftOuterJoin, rightOuterJoin 和 fullOuterJoin 等算子。
cogroup(otherDataset, [numTasks]) 在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, (Iterable, Iterable)) tuples 的 dataset。
cartesian(otherDataset) 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) 类型的 dataset(即笛卡尔积)。
coalesce(numPartitions) 将 RDD 中的分区数减少为 numPartitions。
repartition(numPartitions) 随机重新调整 RDD 中的数据以创建更多或更少的分区,并在它们之间进行平衡。
repartitionAndSortWithinPartitions(partitioner) 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并对分区中的数据按照 key 值进行排序。这比调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作所在的机器。

Spark 常用的 Action 算子

Action(动作) Meaning(含义)
reduce(func) 使用函数func执行归约操作
collect() 以一个 array 数组的形式返回 dataset 的所有元素,适用于小结果集。
count() 返回 dataset 中元素的个数。
first() 返回 dataset 中的第一个元素,等价于 take(1)。
take(n) 将数据集中的前 n 个元素作为一个 array 数组返回。
takeSample(withReplacement, num, [seed]) 对一个 dataset 进行随机抽样
takeOrdered(n, [ordering]) 按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 n 个元素。只适用于小结果集,因为所有数据都会被加载到驱动程序的内存中进行排序。
saveAsTextFile(path) 将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。
saveAsSequenceFile(path) 将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。该操作要求 RDD 中的元素需要实现 Hadoop 的 Writable 接口。对于 Scala 语言而言,它可以将 Spark 中的基本数据类型自动隐式转换为对应 Writable 类型。(目前仅支持 Java and Scala)
saveAsObjectFile(path) 使用 Java 序列化后存储,可以使用 SparkContext.objectFile() 进行加载。(目前仅支持 Java and Scala)
countByKey() 计算每个键出现的次数。
foreach(func) 遍历 RDD 中每个元素,并对其执行fun函数b

所有的转换都是懒惰(Lazy)操作,它们只是记住了需要这样的转换操作,并不会马上执行,只有等到 Actions 操作的时候才会真正启动计算过程进行计算。举个具体的例子,如图所示,

1600094733358

先经过转换 textFile 把数据从 HDFS 加载到 RDDA 以及 RDDC,这时其实RDD A 或者 RDD C 中都是没有数据的。再到后面的转换 flatMap、map、reduceByKey等,分别把 RDD A 转换为 RDD B 到 RDD F 以及 RDD C 到 RDD E等,这些转换都是没有执行的。可以理解为先做个计划,但是没有具体执行,等到执行操作saveAsSequenceFile时,才开始真正触发并执行任务。

宽依赖和窄依赖

RDD 和它的父 RDD(s) 之间的依赖关系分为两种不同的类型:

  • 窄依赖 (narrow dependency):父 RDDs 的一个分区最多被子 RDDs 一个分区所依赖;
  • 宽依赖 (wide dependency):父 RDDs 的一个分区可以被子 RDDs 的多个子分区所依赖。

如下图,每一个方框表示一个 RDD,带有颜色的矩形表示分区:

EMhehb

区分这两种依赖是非常有用的:

  • 首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)对父分区数据进行计算,例如先执行 map 操作,然后执行 filter 操作。而宽依赖则需要计算好所有父分区的数据,然后再在节点之间进行 Shuffle,这与 MapReduce 类似。
  • 窄依赖能够更有效地进行数据恢复,因为只需重新对丢失分区的父分区进行计算,且不同节点之间可以并行计算;而对于宽依赖而言,如果数据丢失,则需要对所有父分区数据进行计算并再次 Shuffle。

Stage

Spark 中还有一个重要的概念,即 Stage。一般而言,一个 Job 会分成一定数量的 Stage。各个 Stage 之间按照顺序执行。

那么 Siage 是怎么划分的?

在 Spark 中,一个 Job 会被拆分成多组 Task,每组任务就是一个 Stage。而 Spark 中有两类 Task,分别是 ShuffleMapTask和 ResultTask。ShuffleMapTask 的输出是 Shuffle 所需的数据,ResultTask 的输出是最终的结果。

因此 Stage 也以此为依据进行划分,简单地说,Stage 是以 Shuffle 和 Result 这两种类型划分的,Shuffle 之前的所有变换是一个 Stage,Shuftle 之后的操作是另一个 Stage。比如

1
rdd.parallize(1 to 10).foreach(println)

这个操作没有 Shufile,直接就输出了。它的 Task 只有一个,即 ResultTask,Stage 也只有一个。如果是

1
rdd.map(x=>(x, 1).reduceByKey(_ + _).foreach(println)

这个 Job 因为有 reduceByKey 操作,所以有一个 Shuffie 过程,那么 reduceBykey之前的是一个 Stage,执行 shuffleMapTask,输出 Shufle 所需的数据reduceByKey 到最后是一个 Stage,直接就输出结果了。如果 Job 中有多个 Shufle,那么每个 Shufle 之前都是一个 Stage。

Job划分

aKk0mk

如图所示,这是一个 Job 的划分过程。在图中,可以看到有 3 个阶段(Stage)分别是 Stage 1(RDD A )、Stage 2( RDD C、RDD D、RDD E、RDD F)、Stage 3(包含所有 RDD)Spark 会将每一个 Job 划分为多个不同的 Stage,而 Stage 之间的依赖关系则形成了有向无环图(DAG )。对于窄依赖,Spark 会尽量多地将 RDD 转换放在同一个阶段(Stage)中。而对于宽依赖,由于宽依赖通常意味着 Shuffle操作,因此Spark会将Shuffle操作定义为阶段( Stage)的边界。也就是说,Spark 遇到宽依赖就划分为一个 Stage,遇到窄依赖则将这个 RDD 加入到该 Stage 中。因此在图中,RDD C、RDD D、RDD E、RDD F 被构建在一个 Stage 中,RDD A 被构建在一个单独的 Stage 中,而 RDDB 和 RDDG 又被构建在同一个 Stage 中。

RDD调度运行流程

结合前面的介绍,针对 Spark的RDD调度运行流程简单解释如下。

1600096397045

如图所示,用户代码(如rdd1.join…)转换为有向无环图(DAG)后,交给 DAGScheduler,由它把 RDD 的有向无环图分割成各个 Stage 的有向无环图,形成 TaskSet,再提交给 TaskScheduler,由 TaskSeheduler 把任务(Task)提交给每个 Worker 上的 Executor,执行具体的 Task。在 TaskSeheduler 中,是不知道各个 Stage 的存在的,运行的只有 Task。

RDD内存管理(持久化)

Spark最重要的一个功能是它可以通过各种操作(operations)持久化(或者缓存)一个集合到内存中。当你持久化一个RDD的时候,每一个节点都将参与计算的所有分区数据存储到内存中,并且这些 数据可以被这个集合(以及这个集合衍生的其他集合)的动作(action)重复利用。这个能力使后续的动作速度更快(通常快10倍以上)。对应迭代算法和快速的交互使用来说,缓存是一个关键的工具。

你能通过persist()或者cache()方法持久化一个rdd。首先,在action中计算得到rdd;然后,将其保存在每个节点的内存中。Spark的缓存是一个容错的技术-如果RDD的任何一个分区丢失,它 可以通过原有的转换(transformations)操作自动的重复计算并且创建出这个分区。

此外,我们可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许我们持久化集合到磁盘上、将集合作为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到 Tachyon中。我们可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用了默认的存储级别—StorageLevel.MEMORY_ONLY。完整的存储级别介绍如下所示:

Storage Level Meaning
MEMORY_ONLY 将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,一些分区将不会被缓存,从而在每次需要这些分区时都需重新计算它们。这是系统默认的存储级别。
MEMORY_AND_DISK 将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,将这些不适合存在内存中的分区存储在磁盘中,每次需要时读出它们。
MEMORY_ONLY_SER 将RDD作为序列化的Java对象存储(每个分区一个byte数组)。这种方式比非序列化方式更节省空间,特别是用到快速的序列化工具时,但是会更耗费cpu资源—密集的读操作。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER类似,但不是在每次需要时重复计算这些不适合存储到内存中的分区,而是将这些分区存储到磁盘中。
DISK_ONLY 仅仅将RDD分区存储到磁盘中
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面的存储级别类似,但是复制每个分区到集群的两个节点上面
OFF_HEAP (experimental) 以序列化的格式存储RDD到Tachyon中。相对于MEMORY_ONLY_SER,OFF_HEAP减少了垃圾回收的花费,允许更小的执行者共享内存池。这使其在拥有大量内存的环境下或者多并发应用程序的环境中具有更强的吸引力。

NOTE:在python中,存储的对象都是通过Pickle库序列化了的,所以是否选择序列化等级并不重要。

Spark也会自动持久化一些shuffle操作(如reduceByKey)中的中间数据,即使用户没有调用persist方法。这样的好处是避免了在shuffle出错情况下,需要重复计算整个输入。如果用户计划重用 计算过程中产生的RDD,我们仍然推荐用户调用persist方法。

如何选择存储级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

  • 如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。

  • 如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。

  • 除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。

  • 如果你希望更快的错误恢复,可以利用重复(replicated)存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。

  • 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:

    • 它运行多个执行者共享Tachyon中相同的内存池

    • 它显著地减少垃圾回收的花费

    • 如果单个的执行者崩溃,缓存的数据不会丢失

回收策略

为了管理有限的内存资源,我们在 RDD的层面上采用 LRU(最近最少使用)回收策略。当一个新的 RDD 分区被计算但是没有足够的内存空间来存储这个分区的数据的时候,我们回收掉最近很少使用的 RDD 的分区数据的占用内存,如果这个 RDD 和这个新的计算分区的 RDD 时同一个 RDD 的时候,我们则不对这个分区数据占用的内存做回收。在这种情况下,我们将相同的 RDD 的老分区的数据保存在内存中是为了不让老是重新计算这些分区的数据,这点是非常重要的,因为很多操作都是对整个 RDD 的所有的 tasks 进行计算的,所以非常有必要将后续要用到的数据保存在内存中。到目前为止,我们发现这种默认的机制在所有的应用中工作的很好,但是我们还是将持久每一个 RDD 数据的策略的控制权交给用户。

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.xpleaf.bigdata.spark.scala.core.p3

import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
* Spark RDD的持久化
*/
object _01SparkPersistOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

var start = System.currentTimeMillis()
val linesRDD = sc.textFile("D:/data/spark/sequences.txt")
// linesRDD.cache()
// linesRDD.persist(StorageLevel.MEMORY_ONLY)

// 执行第一次RDD的计算
val retRDD = linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// retRDD.cache()
// retRDD.persist(StorageLevel.DISK_ONLY)
retRDD.count()
println("第一次计算消耗的时间:" + (System.currentTimeMillis() - start) + "ms")

// 执行第二次RDD的计算
start = System.currentTimeMillis()
// linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).count()
retRDD.count()
println("第二次计算消耗的时间:" + (System.currentTimeMillis() - start) + "ms")

// 持久化使用结束之后,要想卸载数据
// linesRDD.unpersist()

sc.stop()

}
}

检查点支持

虽然“血缘关系”可以用于错误后RDD的恢复,但是对于很长的“血缘关系”的RDD来说,这样的恢复耗时比较长,因此需要通过检查点操作(Checkpoint)保存到外部存储中。

通常情况下,对于包含宽依赖的长“血缘关系”的RDD设置检查点操作是非常有用的。在这种情况下,集群中某个节点出现故障时,会使得从各个父RDD计算出的数据丢失,造成需要重新计算。相反,对于那些窄依赖的RDD,对其进行检查点操作就不是有必须。在这种情况下如果一个节点发生故障,RDD在该节点中丢失的分区数据可以通过并行的方式从其他节点中重新计算出来,计算成本只是复制RDD的很小部分。

Spark提供为RDD设置检查点操作的API,可以让用户自行决定需要为那些数据设置检查点操作。另外由于RDD的只读特性,使得不需要关心数据一致性问题, 比常用的共享内存更容易做检查点。

多用户管理

RDD模型将计算分解为多个相互独立的细粒度任务,这使得它在多用户集群能够支持多种资源共享算法。特别地,每个RDD应用可以在执行过程中动态调整访问资源。

  • 在每个应用程序中, Spark 运行多线程同时提交作业,并通过一种等级 公平调度器来实现多个作业对集群资源的共享,这种调度器和Hadoop Fair Scheduler 类似。该算法主要用于创建基于针对相同内存数据的多用户应用,例如: Spark SQL引擎有一个服务模式支持多用户并行查询。公平调度算法确保短的作业能够在即使长作业占满集群资源的情况下尽早完成。
  • Spark的公平调度也使用延迟调度,通过轮询每台机器的数据,在保持公平的情况下给予作业高的本地性。Spark 支持多级本地化访问策略(本地化),包括内存、磁盘和机架。
  • 由于任务相互独立,调度器还支持取消作业来为高优先级的作业腾出资源。
  • Spark中可以使用Mesos来实现细粒度的资源共享,这使得Spark应用能相互之间或在不同的计算框架之间实现资源的动态共享。
  • Spark使用Sparrow系统扩展支持分布式调度,该调度允许多个Spark应用以去中心化的方式在同一集群上排队工作,同时提供数据本地性、低延迟和公平性。

Java大数据开发入门系列(四)————Spark之作业调度

发表于 2020-09-07 更新于 2020-09-11 分类于 大数据

简介

Spark 有几种计算资源调度的方式。前面讲到,每个Spark应用(SparkContext的实例)都运行一组独立的executor进程。spark运行的集群管理器提供了跨应用之间的资源调度scheduling across applications。其次,在每个Spark应用中,如果多个 jobs(Spark算子)是由不同的线程提交的,那么它们可能会并发运行。如果你的应用是常见的通过网络服务请求,Spark包含一个fair scheduler(公平的调度器)来调度每个SparkContext内的资源。在 Spark 应用内部(对应同一个 SparkContext)各个作业之间,Spark 默认 FIFO 调度,同时也可以支持公平调度。

跨应用调度

当在集群上运行时,每个Spark应用都会得到一组独立的executorJVM,来运行其任务并存储数据。如果多个用户需要共享你的集群,那么会有很多资源分配相关的选项,如何设计还取觉于具体的集群管理器。

对 Spark 所支持的各个集群管理器而言,最简单的的资源分配,就是静态划分。这种方式就意味着,每个 Spark 应用都是设定一个最大可用资源总量,并且该应用在整个生命周期内都会占住这个资源。这种方式在 Spark独立部署 standalone 和 YARN调度,以及 Mesos 粗粒度模式(coarse-grained Mesos mode)下都可用。资源分配可以根据集群类型进行如下配置:

  • Standalone mode: 默认情况下,提交到standalone mode集群的应用将以FIFO(先进先出)的顺序运行,并且每个spark应用都会占用集群中所有可用节点。不过你可以通过设置 spark.cores.max 或者 spark.deploy.defaultCores 来限制单个应用所占用的节点个数。最后,除了可以控制对 CPU 的使用数量之外,还可以通过spark.executor.memory来控制各个应用的内存占用量。
  • Mesos: 在Mesos中要使用静态划分的话,需要将 spark.mesos.coarse 设为true,同样,你也需要配置 spark.cores.max来控制各个应用的 CPU 总数,以及 spark.executor.memory 来控制各个应用的内存占用。
  • YARN: 在 YARN 中需要使用 –num-executors 选项来控制 Spark 应用在集群中分配的执行器的个数。对于单个执行器(executor)所占用的资源,可以使用 –executor-memory 和 –executor-cores 来控制。

Mesos 上还有一种动态共享 CPU 的方式。在这种模式下,每个 Spark 应用的内存占用仍然是固定且独占的(仍由 spark.exexcutor.memory 决定),但是如果该 Spark 应用没有在某个机器上执行任务的话,那么其它应用可以占用该机器上的 CPU。这种模式对集群中有大量不是很活跃应用的场景非常有效,例如:集群中有很多不同用户的 Spark shell session。但这种模式不适用于低延时的场景,因为当 Spark 应用需要使用 CPU 的时候,可能需要等待一段时间才能取得对 CPU 的使用权。要使用这种模式,只需要在 mesos://URL 上设置 spark.mesos.coarse 属性为 false 即可。

值得注意的是,目前还没有任何一种资源分配模式支持跨 Spark 应用的内存共享。如果你想通过这种方式共享数据,我们建议你可以单独使用一个服务(例如:alluxio),这样就能实现多应用访问同一个 RDD 的数据。

动态资源分配

Spark 提供了一种基于负载来动态调节 Spark 应用资源占用的机制。这意味着,你的应用会在资源空闲的时间将其释放给集群,需要时再重新申请。这一特性在多个应用 Spark 集群资源的情况下特别有用。

这个特性默认是禁止的,但是在所有的粗粒度集群管理器上都是可用的,如:独立部署模式standalone mode,YARN mode,和Mesos 粗粒度模式(coarse-grained Mesos mode)。

配置和部署

要使用动态资源分配这一特性有两个前提条件。首先,你的应用必须设置 spark.dynamicAllocation.enabled 为 true。其次,你必须在每个worker节点上启动 external shuffle service,同时将 spark.shuffle.service.enabled 设为 true。external shuffle service 的目的是在移除 executor 的时候,能够保留 executor 输出的 shuffle 文件。启用 external shuffle service 的方式在各个集群管理器上各不相同:

在 Spark 独立部署的集群中,你只需要在 worker 启动前设置 spark.shuffle.service.enabled 为 true 即可。

在 Mesos 粗粒度模式下,你需要在各个节点上运行 $SPARK_HOME/sbin/start-mesos-shuffle-service.sh 并设置 spark.shuffle.service.enabled为true即可。例如,你可以在Marathon来启用这一功能。

在YARN模式下,需要按以下步骤在各个 NodeManager 上启动:here

资源分配策略

总体上来说,Spark 应该在executor空闲时将其关闭,而在后续要用时再申请。因为没有一个固定的方法,可以预测一个executor在后续是否马上会被分配去执行任务,或者一个新分配的executor实际上是空闲的,所以我们需要一个试探性的方法,来决定是否申请或是移除一个executor。

请求策略

一个启用了动态分配的 Spark 应用在等待任务需要调度的时候,会去申请额外的executor。在这种情况下,必定意味着已有的executor已经不足以同时执行所有未完成的任务。

Spark会轮流来申请executor。当有待处理的任务达到spark.dynamicAllocation.scheduleerBacklogTimeout秒时,就会触发实际的请求,如果等待队列中仍有挂起的任务,则每过 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 秒后触发一次资源申请。另外,每一轮申请的executor个数以指数形式增长。例如:一个 Spark 应用可能在首轮申请 1 个执行器,后续的轮次申请个数可能是 2 个、4 个、8 个……。

采用指数级增长策略的原因有两个:第一,一个应用程序在开始时应该谨慎地请求executor,以防万一发现只增加几个executor就够了,这和 TCP 慢启动有些类似;第二,如果一旦 Spark 应用确实需要申请多个executor的话,那么可以确保其所需的计算资源及时增长。

移除策略

移除executor的策略就简单得多了。Spark 应用会在某个执行器空闲超过 spark.dynamicAllocation.executorIdleTimeout 秒后将其删除,在大多数情况下,执行器的移除条件和申请条件都是互斥的,也就是说,执行器在有等待执行任务挂起时,不应该空闲。

优雅的关闭Executor

在非动态分配下,executor可能的退出原因有执行失败或是相关 Spark 应用已经退出。不管是哪种原因,executor的所有相关联的状态都已经不再需要,可以丢弃掉。但是在动态分配的情况下,executor有可能在 Spark 应用运行期间被移除。这时候,如果 Spark 应用尝试去访问该executor存储的状态,就必须重算这一部分数据。因此,Spark 需要一种机制,能够优雅的关闭executor,同时还保留其状态数据。

这种需求对于shuffles操作尤其重要。shuffle过程中,Spark executor首先将 map 输出写到本地磁盘,然后在其他executor试图获取这些map结果数据时,充当这些文件的服务器。一旦有某些任务执行时间过长,动态分配有可能在shuffle结束前移除任务异常的executor,而这些被移除的executor对应的数据将会被重新计算,但这些重算其实是不必要的。

要解决这一问题,就需要用到 external shuffle service,该服务在 Spark 1.2 引入。该服务在每个节点上都会启动一个不依赖于任何 Spark 应用或executor的独立进程。一旦该服务启用,Spark executor不再从各个executor上获取 shuffle文件,转而从这个 service 获取。这意味着,任何执行器输出的混洗状态数据都可能存留时间比对应的执行器进程还长。

除了shuffle文件之外,executor也会在磁盘或者内存中缓存数据。一旦executor被移除,其缓存数据将无法访问。这个问题目前还没有解决。或许在未来的版本中,可能会类似采用external shuffle service的方法,将缓存数据保存在堆外存储中以解决这一问题。

应用内调度

在指定的 Spark 应用内部(对应同一 SparkContext 实例),多个线程可能并发地提交 Spark 作业(job),他们可以同时运行。在本节中,作业(job)是指,由 Spark action 算子(如:collect)触发的一系列计算任务的集合。Spark 调度器是完全线程安全的,并且能够支持 Spark 应用同时处理多个请求(比如:来自不同用户的查询)。

默认,Spark 应用内部使用 FIFO 调度策略。每个作业被划分为多个阶段(stage)(例如:map 阶段和 reduce 阶段),第一个作业在其启动后会优先获取所有的可用资源,然后是第二个作业再申请,再第三个……。如果前面的作业没有把集群资源占满,则后续的作业可以立即启动运行,否则,后提交的作业会有明显的延迟等待。

不过从 Spark 0.8 开始,Spark 也能支持各个作业间的公平(Fair)调度。公平调度时,Spark 以轮询的方式给每个作业分配资源,因此所有的作业获得的资源大体上是平均分配。这意味着,即使有大作业在运行,小的作业再提交也能立即获得计算资源而不是等待前面的作业结束,大大减少了延迟时间。这种模式特别适合于多用户配置。要启用公平调度器,只需设置一下 SparkContext 中 spark.scheduler.mode 属性为 FAIR 即可 :

1
2
3
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平调度资源池

公平调度器还可以支持将作业分组放入资源池(pool),然后给每个资源池配置不同的调度选项(如:权重)(ps:就是设置调度优先级)。这样你就可以给一些比较重要的作业创建一个“高优先级”资源池,或者你也可以把每个用户的作业分到一组,这样一来就是各个用户平均分享集群资源,而不是各个作业平分集群资源。Spark 公平调度的实现方式基本都是模仿 Hadoop Fair Scheduler来实现的。

默认情况下,新提交的作业都会进入到默认资源池中,不过作业对应于哪个资源池,可以在提交作业的线程中用 SparkContext.setLocalProperty 设定 spark.scheduler.pool 属性。示例代码如下 :

1
2
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

一旦设好了局部属性,所有该线程所提交的作业(即:在该线程中调用 action 算子,如:RDD.save/count/collect 等)都会使用这个资源池。这个设置是以线程为单位保存的,你很容易实现用同一线程来提交同一用户的所有作业到同一个资源池中。同样,如果需要清除资源池设置,只需在对应线程中调用如下代码 :

1
sc.setLocalProperty("spark.scheduler.pool", null)

资源池默认行为

默认地,各个资源池之间平分整个集群的资源(包括 default 资源池),但在资源池内部,默认情况下,作业是 FIFO 顺序执行的。举例来说,如果你为每个用户创建了一个资源池,那么久意味着各个用户之间共享整个集群的资源,但每个用户自己提交的作业是按顺序执行的,而不会出现后提交的作业抢占前面作业的资源。

配置资源池属性

资源池的属性需要通过配置文件来指定。每个资源池都支持以下3个属性 :

  • schedulingMode:可以是 FIFO 或 FAIR,控制资源池内部的作业是如何调度的。
  • weight:控制资源池相对其他资源池,可以分配到资源的比例。默认所有资源池的 weight 都是 1。如果你将某个资源池的 weight 设为 2,那么该资源池中的资源将是其他池子的2倍。如果将 weight 设得很高,如 1000,可以实现资源池之间的调度优先级 – 也就是说,weight=1000 的资源池总能立即启动其对应的作业。
  • minShare:除了整体 weight 之外,每个资源池还能指定一个最小资源分配值(CPU 个数),管理员可能会需要这个设置。公平调度器总是会尝试优先满足所有活跃(active)资源池的最小资源分配值,然后再根据各个池子的 weight 来分配剩下的资源。因此,minShare 属性能够确保每个资源池都能至少获得一定量的集群资源。minShare 的默认值是 0。

资源池属性是一个 XML 文件,可以基于 conf/fairscheduler.xml.template 修改,然后在 SparkConf。的 spark.scheduler.allocation.file 属性指定文件路径:

1
conf.set("spark.scheduler.allocation.file", "/path/to/file")

资源池 XML 配置文件格式如下,其中每个池子对应一个 元素,每个资源池可以有其独立的配置 :

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>

额外补充:

Shuffle相关介绍

概述

有些运算需要将各节点上的同一类数据汇集到某一节点进行计算,把这些分布在不同节点的数据按照一定的规则汇集到一起的过程称为 Shuffle。

4C16KL

在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce,而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。而Spark也会有自己的shuffle实现过程。

Spark中的 shuffle

在DAG调度的过程中,Stage 阶段的划分是根据是否有shuffle过程,也就是存在 宽依赖 的时候,需要进行shuffle,这时候会将 job 划分成多个Stage,每一个 Stage 内部有很多可以并行运行的 Task。

XsfPkt

标注对应的RDD后:

![image-20200908223941532](/Users/roger/Library/Application Support/typora-user-images/image-20200908223941532.png)

LEZncX

stage与stage之间的过程就是 shuffle 阶段,前一个 Stage 的 ShuffleMapTask 进行 Shuffle Write, 把数据存储在 BlockManager 上面, 并且把数据位置元信息上报到 Driver 的 MapOutTrack 组件中, 下一个 Stage 根据数据位置元信息, 进行 Shuffle Read, 拉取上个 Stage 的输出数据。在 Spark 中,负责 shuffle 过程的执行、计算和处理的组件主要就是 ShuffleManager 。ShuffleManager 随着Spark的发展有两种实现的方式,分别为 HashShuffleManager 和 SortShuffleManager ,因此spark的Shuffle有 Hash Shuffle 和 Sort Shuffle 两种。

在 Spark 1.2 以前,默认的shuffle计算引擎是 HashShuffleManager 。

HashShuffleManager 有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的 ShuffleManager 改成了 SortShuffleManager 。

SortShuffleManager 相较于 HashShuffleManager 来说,有了一定的改进。主要就在于每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个 Stage 的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

Hash shuffle是不具有排序的Shuffle。

YARN启动shuffle服务

要在YARN群集中的每个NodeManager上启动Spark Shuffle服务,请遵循以下说明:

  1. 用YARN配置文件构建Spark。如果你使用的是预打包的发行版,请跳过这一步。
  2. 找到spark-<version>-yarn-shuffle.jar. 如果你是自己构建的spark会在 $SPARK_HOME/common/network-yarn/target/scala-<version> 下,如果你用的我是发行版则在 yarn 下.
  3. 将这个jar包添加到集群中所有NodeManagers的classpath中。
  4. 在每个节点的yarn-site.xml中,将spark_shuffle添加到yarn.nodemanager.aux-services中,然后将yarn.nodemanager.aux-services.spark_shuffle.class设置org.apache.spark.network.yarn.YarnShuffleService。
  5. 在 etc/hadoop/yarn-env.sh 中设置 YARN_HEAPSIZE(默认为 1000),增加 NodeManager 的堆大小,以避免在shuffle中的垃圾回收问题。
  6. 重新启动集群中的所有NodeManagers。

在YARN上运行shuffle service时,可以使用以下附加配置选项:

Property Name Default Meaning
spark.yarn.shuffle.stopOnFailure false 当Spark shuffle service的初始化出现故障时,是否要停止NodeManager。这可以防止在Spark shuffle service没有运行的NodeManagers上运行容器导致的应用失败。

Spark调度算法(FIFO、FAIR)

FIFO模式的算法类是FIFOSchedulingAlgorithm,FAIR模式的算法实现类是FairSchedulingAlgorithm。下面看两种模式下的比较函数的实现,FIFO:

先比较priority,在FIFO中该优先级实际上是Job ID,越早提交的job的jobId越小,priority越小,优先级越高。

若priority相同,则说明是同一个job里的TaskSetMagager,则比较StageId,StageId越小优先级越高。

1
2
3
4
5
6
7
8
9
10
11
12
13
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
//优先级比较,越小,就最先执行
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
//优先级相同,就比较stageId,StageId越小优先级越高
}
res < 0
}

2kctUu

FAIR 模式中有一个 rootPool 和多个子 Pool, 各个子 Pool 中存储着所有待分配的 TaskSetMagager 。

在 FAIR 模 式 中 , 需 要 先 对 子 Pool 进 行 排 序 , 再 对 子 Pool 里 面 的

TaskSetMagager进行排序,因为Pool和TaskSetMagager都继承了Schedulable特质, 因此使用相同的排序算法。

排序过程的比较是基于 Fair-share 来比较的,每个要排序的对象包含三个属性:

runningTasks 值( 正在运行的 Task 数)、minShare 值、weight 值,比较时会综合考量 runningTasks 值, minShare 值以及 weight 值。

注意,minShare、weight 的值均在公平调度配置文件 fairscheduler.xml 中被指定,调度池在构建阶段会读取此文件的相关配置。

1) 如果 A 对象的 runningTasks 大于它的 minShare, B 对象的 runningTasks 小于它的 minShare,那么 B 排在 A 前面; ( 如果一个Pool的miniShare够用,另一个不够用,先分配给够用的。)

2) 如果 A 、B 对象的 runningTasks 都小于它们的 minShare ,那么就比较runningTasks 与 minShare 的比值( minShare 使用率),谁小谁排前面;( 两个poll都够用,谁占miniShare的少先分配给谁。例如两个Pool同样数量的runningTask,先分配给miniShare大的。)

3) 如果 A 、B 对象的 runningTasks 都大于它们的 minShare ,那么就比较runningTasks 与 weight 的比值( 权重使用率),谁小谁排前面。(同样数量的runningTask,先分配给weight大的)

4) 如果上述比较均相等,则比较名字。

整体上来说就是通过 minShare 和 weight 这两个参数控制比较过程, 可以做到让 minShare 使用率和权重使用率少( 实际运行 task 比例较少) 的先运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
//默认为0,除非通过fair的配置文件进行了配置指定

val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
/* 如果是TaskSetManager时,就是taskSet中运行的task的个数,
* 如果是Pool实例是表示是所有使用这个poolName的所有的TaskSetManager正在运行的task的个数.
*/

val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
//只有在minShare在fair的配置文件中显示配置,同时大于正在运行的task的个数时,才会为true

val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
//运行的task的个数针对于minShare的比重

val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
//得到正在运行的task个数针对于pool的weight的比重
var compare: Int = 0

/*这里首先根据正在运行的task的个数是否已经达到调度队列中最小的分片的个数来进行排序,
* 如果s1中运行运行的个数小于s1的pool的配置的minShare,返回true,表示s1排序在前面.
* 如果s2中运行的task的个数小于s2的pool中配置的minShare(最小分片数)的值,表示s1小于s2,这时s2排序应该靠 * 前.
*/

if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
/*这种情况表示s1与s2两个队列中,正在运行的task的个数都已经大于(不小于)了两个子调度器中配置的minShare的 * 个数时,根据两个子调度器队列中正在运行的task的个数对应此调度器中最小分片的值所占的比重最小的一个排序更靠前
*/
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
/*这种情况表示s1与s2两个子调度器的队列中,正在运行的task的个数都还没有达到配置的最小分片的个数的情况,比
* 较两个队列中正在运行的task的个数对应调度器队列的weigth的占比,最小的一个排序更靠前
*/
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}

if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
//如果两个根据上面的计算,排序值都相同,就看看这两个调度器的名称,按名称的字节序来排序了.
s1.name < s2.name
}
}
}

参考文献:

  • Spark的Shuffle总结分析-说出你的愿望吧
  • Spark Shuffle 详解-小鹏
  • Job-scheduling-apache
  • SPARK的TASK调度器(FAIR公平调度算法)-herman很慢

Java大数据开发入门系列(四)————Spark简介

发表于 2020-09-02 更新于 2020-09-17 分类于 大数据

一、简介

Spark 于 2009 年诞生于加州大学伯克利分校 AMPLab,2013 年被捐赠给 Apache 软件基金会,2014 年 2 月成为 Apache 的顶级项目。相对于 MapReduce 的批处理计算,Spark 可以带来上百倍的性能提升,因此它成为继 MapReduce 之后,最为广泛使用的分布式计算框架。

二、Spark产生背景

Spark 是在 MapReduce 的基础上产生的,借鉴了大量 MapReduce 实践经验,引入多种新型涉及思想和优化策略。针对MapReduce计算框架存在的局限性进行分析,能更好的了解到 Spark。

MapReduce 的局限性如下:

1、处理效率低效

  • Map中间结果写磁盘,Reduce写HDFS,多个MR之间通过HDFS交换数据
  • 任务调度和启动开销大
  • 无法充分利用内存
  • Map 端和 Reduce 端均需要排序
  • 复杂功能 Io 开销大,对于复杂 sql,需转换 MapReduce 计算,需要通过 HDFS 进行磁盘数据交换,而读写Hfds需消耗大量磁盘和网络 IO

2、 不适合迭代计算(如机器学习、图计算等),交互式处理(数据挖掘) 和流式处理(点击日志分析)
3、 MapReduce 编程不够灵活

  • 仅支持 Map 和 Reduce 两种操作
  • 尝试函数式编程风格

4、计算框架多样化、无形中产生运维和管理成本

三、特点

Spark 是基与 MapReduce 基础产生了,克服了其存在的性能低下,变成不够灵活的缺点。

Spark 作为一种 DAG 计算框架,主要特点如下:

Apache Spark 具有以下特点:

  • 使用先进的 DAG 调度程序,查询优化器和物理执行引擎,以实现性能上的保证;
  • 多语言支持,目前支持的有 Java,Scala,Python 和 R;
  • 提供了 80 多个高级 API,可以轻松地构建应用程序;
  • 支持批处理,流处理和复杂的业务分析;
  • 丰富的类库支持:包括 SQL,MLlib,GraphX 和 Spark Streaming 等库,并且可以将它们无缝地进行组合;
  • 丰富的部署模式:支持本地模式和自带的集群模式,也支持在 Hadoop,Mesos,Kubernetes 上运行;
  • 多数据源支持:支持访问 HDFS,Alluxio,Cassandra,HBase,Hive 以及数百个其他数据源中的数据。

r9I3DA

三、Spark的组成

Spark组成(BDAS):全称伯克利数据分析栈,通过大规模集成算法、机器、人之间展现大数据应用的一个平台。也是处理大数据、云计算、通信的技术解决方案。

它的主要组件有:

  • SparkCore:将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。
  • SparkSQL:Spark Sql 是 Spark 来操作结构化数据的程序包,可以让我使用SQL语句的方式来查询数据,Spark支持 多种数据源,包含Hive表,parquest以及JSON等内容。
  • SparkStreaming:是 Spark 提供的实时数据进行流式计算的组件。
  • MLlib:提供常用机器学习算法的实现库。
  • GraphX:提供一个分布式图计算框架,能高效进行图计算。
  • BlinkDB:用于在海量数据上进行交互式SQL的近似查询引擎。
  • Tachyon:以内存为中心高容错的的分布式文件系统。

四、集群架构

Term(术语) Meaning(含义)
Application Spark 应用程序,由集群上的一个 Driver 节点和多个 Executor 节点组成。
Driver program 主运用程序,该进程运行 main() 方法并且创建 SparkContext
Cluster manager 集群资源管理器(例如,Standlone Manager,Mesos,YARN)
Worker node 执行计算任务的工作节点
Executor 位于工作节点上的应用进程,负责执行计算任务并且将输出数据保存到内存或者磁盘中
Task 被发送到 Executor 中的工作单元
Job 一个Action算子(比如collect算子)对应一个Job,由并行计算的多个Task组成。
Stage 每个Job由多个Stage组成,每个Stage是一个Task集合,由DAG分割而成。
Task 承载业务逻辑的运算单元,是Spark平台中可执行的最小工作单元。一个应用根据执行计划以及计算量分为多个Task。

Spark架构采用了分布式计算中的Master-Slave模型。Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。Master作为整个集群的控制器,负责整个集群的正常运行;Worker相当于是计算节点,接收主节点命令与进行状态汇报;Executor负责任务的执行;Client作为用户的客户端负责提交应用,Driver负责控制一个应用的执行。

E2kvnO

执行过程:

  1. 用户程序创建 SparkContext 后,它会连接到集群资源管理器,集群资源管理器会为用户程序分配计算资源,并启动 Executor;
  2. Driver 将计算程序划分为不同的执行阶段和多个 task,之后将 task 发送给 Executor;
  3. Executor 负责执行 Task,并将执行状态汇报给 Driver,同时也会将当前节点资源的使用情况汇报给集群资源管理器。
  4. 因为Driver调度了集群上的 task(任务),更好的方式应该是在相同的局域网中靠近 worker 的节点上运行。如果你不喜欢发送请求到远程的集群,倒不如打开一个 RPC 至 driver 并让它就近提交操作而不是从很远的 worker 节点上运行一个 driver。

driver做什么

  • 运行应用程序的main函数
  • 创建spark的上下文
  • 划分RDD并生成有向无环图(DAGScheduler)
  • 与spark中的其他组进行协调,协调资源等等(SchedulerBackend)
  • 生成并发送task到executor(taskScheduler)

五、核心组件

Spark 基于 Spark Core 扩展了四个核心组件,分别用于满足不同领域的计算需求。

0HsQfr

Spark SQL

Spark SQL 主要用于结构化数据的处理。其具有以下特点:

  • 能够将 SQL 查询与 Spark 程序无缝混合,允许您使用 SQL 或 DataFrame API 对结构化数据进行查询;
  • 支持多种数据源,包括 Hive,Avro,Parquet,ORC,JSON 和 JDBC;
  • 支持 HiveQL 语法以及用户自定义函数 (UDF),允许你访问现有的 Hive 仓库;
  • 支持标准的 JDBC 和 ODBC 连接;
  • 支持优化器,列式存储和代码生成等特性,以提高查询效率。

Spark Streaming

Spark Streaming 主要用于快速构建可扩展,高吞吐量,高容错的流处理程序。支持从 HDFS,Flume,Kafka,Twitter 和 ZeroMQ 读取数据,并进行处理。

zcaFjF

Spark Streaming 的本质是微批处理,它将数据流进行极小粒度的拆分,拆分为多个批处理,从而达到接近于流处理的效果。

JUE5Rf

MLlib

MLlib 是 Spark 的机器学习库。其设计目标是使得机器学习变得简单且可扩展。它提供了以下工具:

  • 常见的机器学习算法:如分类,回归,聚类和协同过滤;
  • 特征化:特征提取,转换,降维和选择;
  • 管道:用于构建,评估和调整 ML 管道的工具;
  • 持久性:保存和加载算法,模型,管道数据;
  • 实用工具:线性代数,统计,数据处理等。

Graphx

GraphX 是 Spark 中用于图形计算和图形并行计算的新组件。在高层次上,GraphX 通过引入一个新的图形抽象来扩展 RDD(一种具有附加到每个顶点和边缘的属性的定向多重图形)。为了支持图计算,GraphX 提供了一组基本运算符(如: subgraph,joinVertices 和 aggregateMessages)以及优化后的 Pregel API。此外,GraphX 还包括越来越多的图形算法和构建器,以简化图形分析任务。

六、spark作业运行流程

Spark 有3 种运行模式,包括 Standalone、YARN 和 Mesos,其中,Mesos 和 YARN模式类似。目前用得比较多的是 Standalone 模式和 YARN 模式。下面将详细介绍 Standalone模式和 YARN 模式的启动方式及运行流程。

Standalone模式

Standalone模式是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。Driver既可以运行在Master节点上,也可以运行在本地Client端。

当以 Standalone 模式向 Spark 集群提交作业时,作业的运行流程如图所示。

YSmAE8

(1)首先,SparkContext 连接到 Master,向 Master 注册并申请资源。

(2)Worker 定期发送心跳信息给 Master 并报告 Executor 状态。

(3)Master 根据 SparkContext 的资源申请要求和 Worker 心跳周期内报告的信息决定在哪个 Worker 上分配资源,然后在该 Worker 上获取资源,启动 StandaloneExecutorBackend。

(4)StandaloneExecutorBackend向 SparkContext注册。

(5)SparkContext 将 Application 代码发送给 StandaloneExecutorBackend,并且 SparkContext 解析 Application 代码,构建 DAG 图,并提交给 DAG Scheduler,分解成 Stage(当碰到 Action 操作时,就会催生 Job,每个 Job 中含有一个或多个 Stage),然后将 Stage(或者称为 TaskSet)提交给 Task Scheduler,Task Scheduler 负责将Task 分配到相应的 Worker,最后提交给 StandaloneExecutorBackend 执行。

(6)StandaloneExecutorBackend 会建立 Executor 线程池,开始执行 Task,并向 SparkContext 报告,直至 Task 完成。

(7)所有 Task 完成后,SparkContext 向 Master 注销,释放资源。

Standalone-Client(Driver在client运行)

提交命令:

1
spark-submit  --master spark://172.18.0.2:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi /spark/examples/jars/spark-examples_2.11-2.3.3.jar  10000

otclL4

(1)Client模式下提交任务,在客户端启动Driver进程。
(2)Driver会向Master申请启动Application启动的资源。
(3)资源申请成功,Driver端将Task发送到Worker端执行。
(4)Worker将Task执行结果返回到Driver端。

work端进程

WLlPqD

master端进程

JDwCHC

client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。生产环境下不能使用client模式,因为Driver可能会回收task执行结果数据,假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端所在节点的Driver收集100个application的结果数据,导致100次网卡流量暴增的问题。

Standalone-Cluster(Driver在Worker运行)

提交命令:

1
spark-submit  --master spark://172.18.0.2:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi /spark/examples/jars/spark-examples_2.11-2.3.3.jar  10000

FvfZNe

(1)Standalone-Cluster模式提交App后,会向Master请求启动Driver。
(2)Master接收请求之后,随机在集群中一台节点启动Driver进程。
(3)Driver启动后为当前的应用程序申请资源。
(4)Driver端发送task到worker节点上执行。
(5)worker将执行情况和执行结果返回给Driver端。

work端进程

4ceQUy

master端进程

JDwCHC

Driver进程是在集群某一台Worker上启动的,在客户端是无法查看task的执行情况的。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散布在集群上。

Yarn模式

Yarn-cluster

提交命令:

1
spark-submit  --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /usr/hdp/3.0.1.0-187/spark2/examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar

在集群模式下,Driver 运行在 Application Master 上,Application Master 进程同时负责驱动 Application 和从 YARN 中申请资源。该进程运行在 YARN Container 内,所以启动Application Master 的 Client 可以立即关闭,而不必持续到 Application 的声明周期。

Pz8yvg

(1)Client(客户端)生成作业信息提交给 ResourceManager。

(2)ResourceManager 在某一个 NodeManager (由 YARN 决定)启动 Container,并将Application Master 分配给该 NodeManager。

(3)NodeManager 接收到 ResourceManager 的分配,启动 Application Master 并初始化作业,此时 NodeManager 就称为 Driver。

(4)Application 向 ResourceManager 申请资源,ResourceManager 分配资源的同时通知其他 NodeManager 启动相应的 Executor。

(5)Executfor 向 NodeManager 上的 Application Master 注册汇报并完成相应的任务。图 1-19 是 YARN 客户端模式的作业运行流程。Application Master 仅仅从 YARN 中申请资源给 Executor,之后 Client 会与 Container通信进行作业的调度。

进程

gebTgh

应用的运行结果不能在客户端显示(可以在 history server 中查看),所以最好将结果保存在 HDFS 而非 stdout 输出,客户端的终端显示的是作为 Yarn 的 job 的简单运行状况。在此模式下,Driver运行在AM(ApplicationMaster)里,可以理解为AM包括了Driver的功能就像Driver运行在AM里一样,此时的AM既能够向AM申请资源并进行分配,又能完成Driver划分RDD提交task等工作。

Yarn-client

提交命令:

1
spark-submit  --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi /usr/hdp/3.0.1.0-187/spark2/examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar

下图是Yarn-client模式的作业运行流程。Application Master 仅仅从 YARN 中申请资源给 Etecutia,之后 Client会与 Container 通信进行作业的调度。

AjwTGM

YARN-Client 模式的作业运行流程描述如下。

(1)客户端生成作业信息提交给 ResourceManager。

(2)ResourceManager 在本地 NodeManager启动 Container,并将 Application Master分配给该 NodeManager。

(3)NodeManager 接收到 ResourceManager 的分配,启动 Application Master 并初始化作业,此时这个 NodeManager 就称为 Driver。

(4)Application 向 ResourceManager 申请资源,ResourceManager 分配资源同时通知其他 NodeMamager 启动相应的 Executor。

(5)Executor 向本地启动的 Application Master 注册汇报并完成相应的任务。

4Ta6xn

Driver运行在客户端上,先有driver再用AM,此时driver负责RDD生成、task生成和分发,向AM申请资源等 ,AM负责向RM申请资源,其他的都由driver来完成。

从上面两张图可看出 YARN-Cluster 和 YARN-Client 的区别。在 YARN-Cluster模式下,SparkDriver 运行在 Application Master(AM)中,它负责向 YARN 申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉 Client,作业会继续在 YARN上运行,所以 YARN-Cluster 模式不适合运行交互类型的作业。然而在 YARN-Client 模式下,AM 仅仅向 YARN 请求 Executor,Client 会与请求得到的 Container 通信来调度它们工作,也就是说 Client 不能离开。

总结起来就是,集群模式的 Spark Driver 运行在 AM 中,而客户端模式的 Spark Driven运行在客户端。所以,YARN-Cluster 适用于生产,而 YARN-Client 适用于交互和调试,也就是希望快速地看到应用的输出信息。

Java大数据开发入门系列(三)————Hive

发表于 2020-08-18 分类于 大数据

文章更新中,请耐心期待

Java大数据开发入门系列(二)————HDFS

发表于 2020-08-18 更新于 2020-08-28 分类于 大数据

一、概述

HDFS (Hadoop Distributed File System)是 Hadoop 下的分布式文件系统,具有高容错、高吞吐量等特性,可以部署在低成本的硬件上。

二、起因

要发展大数据,首要问题就是考虑数据存储的问题,大数据存储存在以下核心问题:

  • 数据存储容量的问题,既然大数据要解决的是数以PB计的数据计算问题,而一般的服务器磁盘容量通常1-2TB,那么如何存储这么大规模的数据。
  • 数据读写速度的问题,一般磁盘的连续读写速度为几十MB,以这样的速度,几十PB的数据恐怕要读写到天荒地老。
  • 数据可靠性的问题,磁盘大约是计算机设备中最易损坏的硬件了,在网站一块磁盘使用寿命大概是一年,如果磁盘损坏了,数据怎么办?

在大数据技术出现之前,人们就需要面对这些关于存储的问题,对应的解决方案就是RAID技术。RAID(独立磁盘冗余阵列)技术主要是为了改善磁盘的存储容量,读写速度,增强磁盘的可用性和容错能力。目前服务器级别的计算机都支持插入多块磁盘(8块或者更多),通过使用RAID技术,实现数据在多块磁盘上的并发读写和数据备份。

常用RAID技术原理

1.RAID 0

数据在从内存缓冲区写入磁盘时,根据磁盘数量将数据分成N份,这些数据同时并发写入N块磁盘,使得数据整体写入速度是一块磁盘的N倍。读取的时候也一样,因此RAID0具有极快的数据读写速度,但是RAID0不做数据备份,N块磁盘中只要有一块损坏,数据完整性就被破坏,所有磁盘的数据都会损坏。

2.RAID 1

数据在写入磁盘时,将一份数据同时写入两块磁盘,这样任何一块磁盘损坏都不会导致数据丢失,插入一块新磁盘就可以通过复制数据的方式自动修复,具有极高的可靠性。

3.RAID 10

结合RAID0和RAID1两种方案,将所有磁盘平均分成两份,数据同时在两份磁盘写入,相当于RAID1,但是在每一份磁盘里面的N/2块磁盘上,利用RAID0技术并发读写,既提高可靠性又改善性能,不过RAID10的磁盘利用率较低,有一半的磁盘用来写备份数据。

4.RAID5

相比RAID3,更多被使用的方案是RAID5。RAID5和RAID3很相似,但是校验数据不是写入第N块磁盘,而是螺旋式地写入所有磁盘中。这样校验数据的修改也被平均到所有磁盘上,避免RAID3频繁写坏一块磁盘的情况。

5.RAID6

如果数据需要很高的可靠性,在出现同时损坏两块磁盘的情况下(或者运维管理水平比较落后,坏了一块磁盘但是迟迟没有更换,导致又坏了一块磁盘),仍然需要修复数据,这时候可以使用RAID6。

RAID6和RAID5类似,但是数据只写入N-2块磁盘,并螺旋式地在两块磁盘中写入校验信息(使用不同算法生成)。

RAID 等级 RAID0 RAID1 RAID3 RAID5 RAID6 RAID10
别名 条带 镜像 专用奇偶校验条带 分布奇偶校验条带 双重奇偶校验条带 镜像加条带
容错性 无 有 有 有 有 有
冗余类型 无 有 有 有 有 有
热备份选择 无 有 有 有 有 有
读性能 高 低 高 高 高 高
随机写性能 高 低 低 一般 低 一般
连续写性能 高 低 低 低 低 一般
需要磁盘数 n≥1 2n (n≥1) n≥3 n≥3 n≥4 2n(n≥2)≥4
可用容量 全部 50% (n-1)/n (n-1)/n (n-2)/n 50%

RAID技术只是在单台服务器的多块磁盘上组成阵列,大数据需要更大规模的存储空间和访问速度。因此,Hadoop结合前面的RAID技术和Google提出的GFS论文,创造了HDFS。HDFS(Hadoop分布式文件系统)是根据GFS(Google文件系统)的原理开发的,是GFS的简化版。

三、HDFS架构原理

HDFS的架构

HDFS的架构:主从架构,三大角色

  1. Namenode负责整个分布式文件系统的元数据(MetaData)管理,也就是文件路径名,数据block的ID以及存储位置等信息,承担着操作系统中文件分配表(FAT)的角色。HDFS为了保证数据的高可用,会将一个block复制为多份(缺省情况为3份),并将三份相同的block存储在不同的服务器上。这样当有磁盘损坏或者某个DataNode服务器宕机导致其存储的block不能访问的时候,Client会查找其备份的block进行访问。
  2. Datanode负责文件数据的存储和读写操作,HDFS将文件数据分割成若干块(block),每个DataNode存储一部分block,这样文件就分布存储在整个HDFS服务器集群中。
  3. SecondaryNamenode严格意义上来说并不属于namenode的备份节点,它主要起到的作用其实是替namenode分担压力,降低负载(元数据的编辑日志合并,也就是edits log)之用

1.心跳机制

为了保证集群的高可用性和高可靠性(HA),DataNode会通过心跳和NameNode保持通信,如果DataNode超时未发送心跳,NameNode就会认为这个DataNode已经失效,立即查找这个DataNode上存储的block有哪些,以及这些block还存储在哪些服务器上,随后通知这些服务器再复制一份block到其他服务器上,保证HDFS存储的block备份数符合用户设置的数目,即使再有服务器宕机,也不会丢失数据。

心跳机制

1.NameNode启动之后,会开一个ipc server。NameNode 全权管理数据块的复制,它周期性从集群中的每个 DataNode 接收心跳信号和 block 状态报告,接收到心跳信号意味着该 DataNode 节点工作正常,块状态报告包含了该 DataNode 上所有数据块的列表

2.DataNode启动,连接NameNode注册,每隔3s向NameNode发送一个心跳,并携带状态信息,周期性地向 NameNode 上报 block 报告。NameNode 返回对该 DataNode 的指令,如将数据块复制到另一台机器,或删除某个数据块等,而当某一个 DataNode 超过10min还没向 NameNode 发送心跳,此时 NameNode 就会判定该 DataNode 不可用,此时客户端的读写操作就不会再传达到该 DataNode 上。

2.安全模式

Hadoop 集群刚开始启动时会进入安全模式,就用到了心跳机制。在集群刚启动的时候,每一个 DataNode 都会向 NameNode 发送 block 报告,NameNode 会统计它们上报的总block数,除以一开始知道的总个数total,当 block/total < 99.99% 时,会触发安全模式,安全模式下客户端就没法向HDFS写数据,只能进行读数据。

Namenode感知Datanode掉线死亡时间的计算公式为:

1
timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval

HDFS默认超时时间为630秒,heartbeat.recheck.interval(重新检查的时间间隔) 的默认值为5分钟,而 dfs.heartbeat.interval(发送一次心跳的间隔) 默认值为3秒。

安全模式不仅仅是集群刚启动时等所有的Datanode汇报这一种情况会进入安全模式的,还有就是HDFS数据块丢失达到一个比例的时候,也会自动进入,这个比例默认是0.1%,1000个块丢1个已经很严重的事件了。

四、HDFS的使用

关于java api的说明:

FileSystem 是所有 HDFS 操作的主入口。

1
2
3
4
5
Configuration configuration = new Configuration();
// 这里我启动的是单节点的 Hadoop,所以副本系数设置为 1,默认值为 3
configuration.set("dfs.replication", "1");
// HDFS_PATH hdfs连接地址,HDFS_USER hdfs连接用户
FileSystem fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);

1. 显示当前目录结构

shell命令:

1
2
3
4
5
6
# 显示当前目录结构
hadoop fs -ls <path>
# 递归显示当前目录结构
hadoop fs -ls -R <path>
# 显示根目录下内容
hadoop fs -ls /

java API:

1
FileStatus[] statuses = fileSystem.listStatus(new Path("/"));

2. 创建目录

shell命令:

1
2
3
4
# 创建目录
hadoop fs -mkdir <path>
# 递归创建目录
hadoop fs -mkdir -p <path>

java API:

1
fileSystem.mkdirs(new Path("/test0/"));

3. 删除操作

shell命令:

1
2
3
4
# 删除文件
hadoop fs -rm <path>
# 递归删除目录和文件
hadoop fs -rm -R <path>

java API:

1
2
//返会 Boolean类型
fileSystem.delete(new Path("/hdfs-api/test/b.txt"), true);

4. 从本地加载文件到 HDFS

shell命令:

1
2
3
# 二选一执行即可
hadoop fs -put [localsrc] [dst]
hadoop fs - copyFromLocal [localsrc] [dst]

java API:

1
2
3
4
// 如果指定的是目录,则会把目录及其中的文件都复制到指定目录下
Path src = new Path("D:\\BigData-Notes\\notes\\installation");
Path dst = new Path("/hdfs-api/test/");
fileSystem.copyFromLocalFile(src, dst);

5. 从 HDFS 导出文件到本地

shell命令:

1
2
3
# 二选一执行即可
hadoop fs -get [dst] [localsrc]
hadoop fs -copyToLocal [dst] [localsrc]

java API:

1
2
3
4
5
6
7
8
9
10
Path src = new Path("/hdfs-api/test/kafka.tgz");
Path dst = new Path("D:\\app\\");
/*
* 第一个参数控制下载完成后是否删除源文件,默认是 true,即删除;
* 最后一个参数表示是否将 RawLocalFileSystem 用作本地文件系统;
* RawLocalFileSystem 默认为 false,通常情况下可以不设置,
* 但如果你在执行时候抛出 NullPointerException 异常,则代表你的文件系统与程序可能存在不兼容的情况 (window 下常见),
* 此时可以将 RawLocalFileSystem 设置为 true
*/
fileSystem.copyToLocalFile(false, src, dst, true);

6. 查看文件内容

shell命令:

1
2
3
# 二选一执行即可
hadoop fs -text <path>
hadoop fs -cat <path>

java API:

1
2
3
FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs-api/test/a.txt"));
String context = inputStreamToString(inputStream, "utf-8");
System.out.println(context);

7. 显示文件的最后一千字节

1
2
3
hadoop fs -tail  <path> 
# 和Linux下一样,会持续监听文件内容变化 并显示文件的最后一千字节
hadoop fs -tail -f <path>

8. 拷贝文件

shell命令:

1
hadoop fs -cp [src] [dst]

9. 移动文件

shell命令:

1
hadoop fs -mv [src] [dst]

10. 统计当前目录下各文件大小

shell命令:

  • 默认单位字节
  • -s : 显示所有文件大小总和,
  • -h : 将以更友好的方式显示文件大小(例如 64.0m 而不是 67108864)
1
hadoop fs -du  <path>

11. 合并下载多个文件

shell命令:

  • -nl 在每个文件的末尾添加换行符(LF)
  • -skip-empty-file 跳过空文件
1
2
3
hadoop fs -getmerge
# 示例 将HDFS上的hbase-policy.xml和hbase-site.xml文件合并后下载到本地的/usr/test.xml
hadoop fs -getmerge -nl /test/hbase-policy.xml /test/hbase-site.xml /usr/test.xml

12. 统计文件系统的可用空间信息

shell命令:

1
hadoop fs -df -h /

13. 更改文件复制因子

shell命令:

1
hadoop fs -setrep [-R] [-w] <numReplicas> <path>
  • 更改文件的复制因子。如果 path 是目录,则更改其下所有文件的复制因子
  • -w : 请求命令是否等待复制完成
1
2
# 示例
hadoop fs -setrep -w 3 /user/hadoop/dir1

14. 权限控制

shell命令:

1
2
3
4
5
6
7
# 权限控制和Linux上使用方式一致
# 变更文件或目录的所属群组。 用户必须是文件的所有者或超级用户。
hadoop fs -chgrp [-R] GROUP URI [URI ...]
# 修改文件或目录的访问权限 用户必须是文件的所有者或超级用户。
hadoop fs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...]
# 修改文件的拥有者 用户必须是超级用户。
hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]

15. 文件检测

shell命令:

1
hadoop fs -test - [defsz]  URI

可选选项:

  • -d:如果路径是目录,返回 0。
  • -e:如果路径存在,则返回 0。
  • -f:如果路径是文件,则返回 0。
  • -s:如果路径不为空,则返回 0。
  • -r:如果路径存在且授予读权限,则返回 0。
  • -w:如果路径存在且授予写入权限,则返回 0。
  • -z:如果文件长度为零,则返回 0。
1
2
# 示例
hadoop fs -test -e filename

java API:

1
2
// 返会Boolean类型
fileSystem.exists(new Path("/hdfs-api/test/a.txt"));

Java大数据开发入门系列(一)————环境搭建

发表于 2020-08-18 更新于 2020-08-31 分类于 大数据

环境

  • MacOS10.15

  • Parallels Desktop16创建搭建的centos8 虚拟机

  • 集群环境:

    IP地址 主机名 操作系统 角色 服务
    10.211.55.8/24 master Centos8 master HDFS(NameNode)、DFSZKFailoverController(zkfc)、SYNC(同步文件服务器)、ResourceManager(资源分配与调度)
    10.211.55.9/24 slave1 Centos8 slave1 Zookeeper、HDFS(SecondaryNamenode)、MapReduce(JobHistoryServer)、NodeManager
    10.211.55.10/24 slave2 Centos8 Slave2 Zookeeper、HDFS(DataNode)、NodeManager
    10.211.55.11/24 slave3 Centos8 Slave3 Zookeeper、HDFS(DataNode)、NodeManager

开始搭建

防火墙和SELINUX设置

因为Hadoop需要开启的端口很多,而且牵涉到很多的权限,所以我们在测试时将防火墙和SELINUX都关掉。
在生产环境中,需要针对不同的开放端口做针对性的设置。

关闭防火墙

运行以下命令,关闭防火墙

1
2
3
# 临时关闭防火墙 和 禁止开机启动防火墙
systemctl stop firewalld && systemctl disable firewalld
systemctl status firewalld #查看防火墙状态。

关闭SELINUX

运行以下命令:

1
vi /etc/selinux/config         #SELINUX配置文件

相关参数修改如下:

1
2
#SELINUX=enforcing
SELINUX=disabled

重启服务器,然后查看SELINUX状态:

1
2
3
reboot       #重启
getenforce #查询SELinux的运行模式,permissive(宽容模式);enforcing(强制模式);
/usr/sbin/sestatus -v #查看SELINUX的状态

显示如下内容,则说明SELINUX已经关闭了

1
SELinux status:                 disabled

修改主机名

以master(10.211.55.8)节点为例,运行下面的命令,修改本机的hostname

1
2
#修改hostname
hostnamectl set-hostname master

运行下面的命令查看设置好的hostname

1
cat /etc/hostname

如果显示如下的内容,则说明修改成功

1
master

其他节点(10.211.55.9~11)同样进行以上操作,修改主机名。

hosts设置

由于一次次的远程连接需要输入IP地址,不利于管理和使用,我们可以在hosts里面把服务器的hostname跟IP地址对应起来。

在master(10.211.55.8)输入以下命令

1
vi /etc/hosts

设置成以下内容

1
2
3
4
5
6
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
10.211.55.8 master
10.211.55.9 slave1
10.211.55.10 slave2
10.211.55.11 slave3

其他节点(10.211.55.9~11)同样进行以上操作,修改hosts。

添加Hadoop用户

1
useradd hadoop && passwd hadoop

命令输入完毕后设置用户密码就行

可为 hadoop 用户增加管理员权限,方便部署,避免一些对新手来说比较棘手的权限问题,执行:

1
visudo

Shell 命令

如下图,找到 root ALL=(ALL) ALL 这行(应该在第98行,可以先按一下键盘上的 ESC 键,然后输入 :98 (按一下冒号,接着输入98,再按回车键),可以直接跳到第98行 ),然后在这行下面增加一行内容:

1
hadoop  ALL=(ALL)       ALL

如下图所示:

RhXSSv

切换到hadoop用户

1
2
su hadoop
cd ~

每个节点都执行一次

配置免密登录

在所有的节点上执行以下命令

1
2
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa         #生成验证密钥
cat ~/.ssh/id_rsa.pub|ssh hadoop@master "cat - >> ~/.ssh/authorized_keys" #发送给主服务器

如果需要互相免密码登录,则master执行下面命令,把密钥分发给从服务器

1
2
3
scp ~/.ssh/authorized_keys hadoop@slave1:~/.ssh/authorized_keys
scp ~/.ssh/authorized_keys hadoop@slave2:~/.ssh/authorized_keys
scp ~/.ssh/authorized_keys hadoop@slave3:~/.ssh/authorized_keys

在所有的节点执行以下命令

1
chmod 0600 ~/.ssh/authorized_keys

退出hadoop用户,再重新 su hadoop进入一次即可免密登录了

验证免密登录

oE23jf

安装JDK

创建upload目录

1
mkdir upload

将下载下来的jdk上传到master节点的/home/hadoop/upload下,jdk下载地址

执行以下命令

1
2
3
4
sudo mkdir /usr/java
cd /home/hadoop/upload
tar -zxvf jdk-8u261-linux-x64.tar.gz #解压JDK
sudo sudo mv jdk1.8.0_261/ /usr/java/ #将JDK移动(剪切)到/usr/java/目录

修改全局环境变量

1
sudo vi /etc/profile       #文件底部添加以下内容

在文件底部添加以下内容

1
2
3
# jdk
export JAVA_HOME="/usr/java/jdk1.8.0_261"
export PATH=$JAVA_HOME/bin:$PATH

加载新的全局环境变量

1
source /etc/profile

执行以下命令验证jdk是否安装成功

1
java -version

LZmlJj

将jdk发送到其他节点

1
2
3
4
5
6
7
cd /usr/java

ssh root@slave1 'mkdir -p /usr/java/jdk1.8.0_261' && sudo scp -r /usr/java/jdk1.8.0_261/ root@slave1:$PWD

ssh root@slave2 'mkdir -p /usr/java/jdk1.8.0_261' && sudo scp -r /usr/java/jdk1.8.0_261/ root@slave2:$PWD

ssh root@slave3 'mkdir -p /usr/java/jdk1.8.0_261' && sudo scp -r /usr/java/jdk1.8.0_261/ root@slave3:$PWD

将环境变量发送到其他节点

1
2
3
4
5
sudo scp /etc/profile root@slave1:/etc/profile

sudo scp /etc/profile root@slave2:/etc/profile

sudo scp /etc/profile root@slave3:/etc/profile

在slave1、slave2、slave3上分别执行以下命令,让环境变量生效

1
source /etc/profile

安装Zookeeper

安装并配置Zookeeper

由于Zookeeper类似于民主选举,每台服务器分别投票共同选举一个作为leader,剩下的都是follower。基于这个原因,官方建议服务器集群设置为奇数台,偶数台的话会有一台的资源浪费。根据咱们的集群规划:slave1~3为我们的Zookeeper服务器。

在slave1执行以下命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mkdir /home/hadoop/upload

su root

cd /home/hadoop/upload

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz

tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz

mkdir /home/hadoop/server && mkdir -p /home/hadoop/data/zkdata && mkdir -p /home/hadoop/log/zklog

mv apache-zookeeper-3.6.1-bin/ /home/hadoop/server/

sudo chown -R hadoop:hadoop /home/hadoop/*

修改全局环境变量

1
sudo vi /etc/profile       #文件底部添加以下内容

在文件底部添加以下内容

1
2
3
# zookeeper
export ZOOKEEPER_HOME="/home/hadoop/zookeeper"
export PATH=$ZOOKEEPER_HOME/bin:$PATH

加载新的全局环境变量

1
2
3
source /etc/profile  
# 切换到hadoop用户
su hadoop

将Zookeeper发送到slave2、slave3

1
2
3
4
5
cd /home/hadoop

ssh root@slave2 'mkdir -p /home/hadoop/server && mkdir -p /home/hadoop/data/zkdata && mkdir -p /home/hadoop/log/zklog' && sudo scp -r /home/hadoop/server/ root@slave2:$PWD && sudo scp -r /home/hadoop/data/ root@slave2:$PWD && sudo scp -r /home/hadoop/log/ root@slave2:$PWD

ssh root@slave3 'mkdir -p /home/hadoop/server && mkdir -p /home/hadoop/data/zkdata && mkdir -p /home/hadoop/log/zklog' && sudo scp -r /home/hadoop/server/ root@slave3:$PWD && sudo scp -r /home/hadoop/data/ root@slave3:$PWD && sudo scp -r /home/hadoop/log/ root@slave3:$PWD

将环境变量发送到slave2、slave3

1
sudo scp /etc/profile root@slave2:/etc/profile && sudo scp /etc/profile root@slave3:/etc/profile

在slave2、slave3上分别执行以下命令,让环境变量生效

1
source /etc/profile

使用hadoop执行以下命令

1
2
cd /home/hadoop/server/apache-zookeeper-3.6.1-bin/conf
sudo cp zoo_sample.cfg zoo.cfg

修改zoo.cfg的文件内容

1
vi zoo.cfg
1
2
3
4
5
dataDir=/home/hadoop/data/zkdata
dataLogDir=/home/hadoop/log/zklog/
server.1=slave1:2888:3888
server.2=slave2:2888:3888
server.3=slave3:2888:3888

将zoo.cfg分发到slave2~3

1
sudo scp /home/hadoop/server/apache-zookeeper-3.6.1-bin/conf/zoo.cfg hadoop@slave2:/home/hadoop/server/apache-zookeeper-3.6.1-bin/conf/zoo.cfg && sudo scp /home/hadoop/server/apache-zookeeper-3.6.1-bin/conf/zoo.cfg hadoop@slave3:/home/hadoop/server/apache-zookeeper-3.6.1-bin/conf/zoo.cfg

最后不要忘了在每个服务器“/home/hadoop/data/zkdata/”下新建文件“myid”并把当前服务器编号写进去,举例:

1
2
3
sudo chown -R hadoop:hadoop /home/hadoop/*
# slave1是1,slave2是2,slave3是3
echo 1 > /home/hadoop/data/zkdata/myid

以下内容只能在slave1~3上执行才能看到正确的结果:

1
2
3
zkServer.sh stop    #停止Zookeeper服务
zkServer.sh start #开启Zookeeper服务
zkServer.sh status #开启Zookeeper服务

执行zkServer.sh start正常情况下会看到下面的内容:

PW9TW7

如果启动失败,可以到“/home/hadoop/zookeeper/logs/”这个目录里面看看启动日志。

录入下面的命令:

1
jps

可以看到如下的结果:

FHDioK

注意:虽然我们在配置文件中写明了服务器的列表信息,但是,我们还是需要去每一台服务 器去启动,不是一键启动集群模式。
每启动一台查看一下状态再启动下一台
三台机器上都要有QuorumPeerMain进程,都能显示follower或者leader

安装Hadoop

把下载好的Hadoop安装包上传到master节点的/home/hadoop/upload目录下面。

hadoop下载地址

先通过root账户执行以下操作

1
2
3
4
5
6
su root
mkdir -p cd /home/hadoop/server
cd /home/hadoop/server
tar -zxvf hadoop-3.3.0.tar.gz
mv /home/hadoop/upload/hadoop-3.3.0/ /home/hadoop/server/
chown -R hadoop:hadoop /home/hadoop/*

修改全局环境变量

1
sudo vi /etc/profile       #文件底部添加以下内容

在文件底部添加以下内容

1
2
3
# Hadoop
export HADOOP_HOME="/home/hadoop/server/hadoop-3.3.0"
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

加载新的全局环境变量

1
source /etc/profile

输入echo $HADOOP_HOME显示如下则表明成功

6RkiD2

在其他节点同样执行以上添加环境变量的操作

进入master节点,创建相关目录,执行以下命令

1
2
3
4
su hadoop
rm -rf /home/hadoop/server/hadoop-3.3.0/share/doc #删除文档,很大,又没用
mkdir -p /home/hadoop/data/dfs/data && mkdir /home/hadoop/data/dfs/name && mkdir /home/hadoop/data/dfs/tmp && mkdir /home/hadoop/data/journaldata
cd /home/hadoop/server/hadoop-3.3.0/etc/hadoop

修改hadoop-env

1
vi /home/hadoop/server/hadoop-3.3.0/etc/hadoop/hadoop-env.sh

添加一行内容:

1
export JAVA_HOME="/usr/java/jdk1.8.0_261"

修改配置文件core-site.xml

1
vi /home/hadoop/hadoop/etc/hadoop/core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<configuration>
<!-- 指定hdfs的nameservice为master -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:9000</value>
</property>
<!-- 指定hadoop临时目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/data/dfs/tmp</value>
</property>
<!-- 指定zookeeper地址 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>slave1:2181,slave2:2181,slave3:2181</value>
</property>
<!-- hadoop链接zookeeper的超时时长设置 -->
<property>
<name>ha.zookeeper.session-timeout.ms</name>
<value>1000</value>
<description>ms</description>
</property>
<!-- 修改core-site.xml中的ipc参数,防止出现连接journalnode服务ConnectException -->
<property>
<name>ipc.client.connect.max.retries</name>
<value>100</value>
<description>Indicates the number of retries a client will make to establish a server connection.</description>
</property>
<property>
<name>ipc.client.connect.retry.interval</name>
<value>10000</value>
<description>Indicates the number of milliseconds a client will wait for before retrying to establish a server connection.</description>
</property>
<property>
<name>topology.script.file.name</name>
<value>10000</value>
<description>Indicates the number of milliseconds a client will wait for before retrying to establish a server connection.</description>
</property>
</configuration>

修改配置文件hdfs-site.xml

1
vi /home/hadoop/hadoop/etc/hadoop/hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<configuration>
<!-- 指定副本数 -->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<!-- 配置namenode和datanode的工作目录-数据存储目录 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>/home/hadoop/data/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/home/hadoop/data/dfs/data</value>
</property>
<!-- 启用webhdfs -->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<!-- 配置HDFS的权限控制 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<!-- 配置SecondaryNameNode的节点地址 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>slave1:50090</value>
</property>
</configuration>

修改配置文件mapred-site.xml

1
vi /home/hadoop/hadoop/etc/hadoop/mapred-site.xml

全部内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<configuration>
<!-- 指定mr框架为yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

<!-- 指定mapreduce jobhistory地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>slave1:10020</value>
</property>

<!-- 任务历史服务器的web地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>slave1:19888</value>
</property>
</configuration>

修改配置文件yarn-site.xml

1
vi /home/hadoop/hadoop/etc/hadoop/yarn-site.xml

内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<configuration>
<!-- 以逗号分隔的服务列表,其中服务名称应仅包含a-zA-Z0-9_并且不能以数字开头-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 配置ResourceManager的服务节点 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>

<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>

<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>master:8088</value>
</property>
</configuration>

修改配置文件workers

1
vi /home/hadoop/hadoop/etc/hadoop/workers

文件中配置的是DataNode的所在节点服务

1
2
slave2
slave3

修改yarn-env

1
vi /home/hadoop/hadoop/etc/hadoop/yarn-env.sh

添加一行内容:

1
export JAVA_HOME="/usr/java/jdk1.8.0_261"

修改mapred-env

1
vi /home/hadoop/hadoop/etc/hadoop/mapred-env.sh

添加一行内容:

1
export JAVA_HOME="/usr/java/jdk1.8.0_261"

将hadoop安装包分发到其他集群节点

1
2
3
4
5
cd /home/hadoop/server
scp -r /home/hadoop/server/hadoop-3.3.0/ hadoop@slave1:$PWD && scp -r /home/hadoop/server/hadoop-3.3.0/ hadoop@slave2:$PWD && scp -r /home/hadoop/server/hadoop-3.3.0/ hadoop@slave3:$PWD

cd /home/hadoop/data
scp -r /home/hadoop/data/dfs/ hadoop@slave1:$PWD && scp -r /home/hadoop/data/journaldata hadoop@slave1:$PWD && scp -r /home/hadoop/data/dfs/ hadoop@slave2:$PWD && scp -r /home/hadoop/data/journaldata hadoop@slave2:$PWD && scp -r /home/hadoop/data/dfs/ hadoop@slave3:$PWD && scp -r /home/hadoop/data/journaldata hadoop@slave3:$PWD

格式化HDFS

在master节点中格式化HDFS(只在master节点,即NameNode执行),执行以下命令

1
hdfs namenode -format

格式化zkfc

在master节点中格式化HDFS(只在master节点,即NameNode执行),执行以下命令

1
hdfs zkfc -formatZK

启动Hadoop集群

日常启动hadoop

Hadoop启动顺序:Zookeeper->Hadoop->Hbase>Hive…

停止所有服务

1
2
3
stop-all.sh                            #最好在master(NameNode)上执行
mapred --daemon stop historyserver #master上执行
zkServer.sh stop #slave1~3上操作

启动ZooKeeper

在slave1~3上分别执行以下命令:

1
2
3
zkServer.sh stop
zkServer.sh start
jps

显示如下内容,则启动成功

0VF4xe

启动HDFS

其中一台机器执行就OK了,比如:master

1
2
stop-dfs.sh    #先停掉服务
start-dfs.sh #如果出现错误,则在hadoop-env.sh中,再显示地重新声明一遍JAVA_HOME

显示内容如下:

fjyyxM

执行命令查看:

1
jps

master上显示如下:

2yw9A8

Slaver1~3显示如下:

FYF63v

pRRr0o

启动YARN

在master进行启动:

1
2
stop-yarn.sh
start-yarn.sh

显示内容如下:

y1tDm5

执行命令查看:

1
jps

显示如下:

vwyzNP

若备用节点的 resourcemanager 没有启动起来,则手动启动起来

1
yarn-daemon.sh start resourcemanager

启动 mapreduce 任务历史服务器

在slave1上执行如下命令:

1
2
mapred --daemon stop historyserver
mapred --daemon start historyserver

执行命令查看:

1
jps

显示如下:

JfbHLC

WEB控制台

hdfs控制台

http://10.211.55.8:9870/dfshealth.html#tab-overview

![image-20200830213217841](/Users/roger/Library/Application Support/typora-user-images/image-20200830213217841.png)

yarn控制台

http://10.211.55.8:8088/cluster

nq8zHG

SpringBoot中使用flyway做好数据库版本控制

发表于 2020-08-16 分类于 spring boot

为什么需要数据库版本控制

​ 在真实的项目开发中,我们一般有三套环境:开发、测试、生产。在开发阶段我们一般都是在开发环境中进行操作,项目开发肯定不止一个人,在开发的过程中,我们肯定会对数据库的库表进行一些修改操作。并且这些操作需要同步到这三套环境中。但是多人协同,人为操作难免会出现疏忽,有时候修改了开发环境忘记了去修改其他环境。很多情况下都需要对数据库的变化做跟踪,以便于我们回退到某个版本。因此我们需要有这样一个工具来帮助我们管理数据库版本,做好各个环境同步更新。

为什么选择Flyway

​ 现在比较常用的数据库版本管理工具有Flyway和Liquibase。Spring Boot提供了这两者的内建支持,可以很快应用到产品中。

​ 使用Flyway的好处在于使用简单,直接书写我们比较熟悉的sql脚本即可,不需要进行额外的学习。Liquibase的优点在于它能跨平台、跨库,但是需要我们花时间去学习他的脚本编写规则。如果你的项目中没有跨数据库的需要,那么flyway完全够用了。

Flyway的工作模式

​ flyway在项目运行的时候会判断你的数据库中是否存在flyway_schema_history表,如果没有就会创建。这个表主要用于记录数据库的状态,Flyway的版本控住主要也是依赖这张表的。

​ 当flyway_schema_history这张表存在,。当检测到你有新的版本需要迁移的时候,Flyway会逐一对比flyway_schema_history表中的已存在的版本记录,如果有未应用的Migrations,Flyway会获取这些Migrations并按版本号次序迁移到数据库中。

TkDIKR

应用每个迁移时,flyway_schema_history表将相应更新:

GC3nO8

​ flyway在升级数据库的时候,会检查已经执行过的版本对应的脚本是否发生变化,包括脚本文件名,以及脚本内容。如果flyway检测到发生了变化,则抛出错误,并终止升级。

​ 如果已经执行过的脚本没有发生变化,flyway会跳过这些脚本,依次执行后续版本的脚本,并在记录表中插入对应的升级记录。

​ 所以,flyway总是幂等的,而且可以支持跨版本的升级。

​ Migrations就是我们用SQL编写的脚本。为了让我们编写的SQL脚本生效,还需要按照Flyway指定的方式进行命名。

nuNib6

  • Prefix: V代表版本化,U代表撤销,R代表可重复迁移。
  • Version: 带点或下划线的版本,可以随意分隔多个部分(不适合重复迁移)。
  • Separator: __ (两个下划线)
  • Description: 下划线或空格分隔单词
  • Suffix: .sql

如何使用

1、添加依赖

1
2
3
4
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>

2、配置application.yml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
spring:
datasource:
url: jdbc:mysql://localhost:3306/test?useSSL=false&allowMultiQueries=true
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver

# 数据库版本控制
flyway:
# 启用或禁用 flyway
enabled: true
# 字符编码
encoding: utf-8
# 对执行迁移时基准版本的描述
baseline-description: test
# 若连接的数据库非空库,是否初始化
# 当迁移时发现目标schema非空,而且带有没有元数据的表时,是否自动执行基准迁移,默认false.
baseline-on-migrate: true
# 指定 baseline 的版本号,缺省值为 1, 低于该版本号的 SQL 文件, migrate 的时候被忽略
# 开始执行基准迁移时对现有的schema的版本打标签,默认值为1.
baseline-version: 0
# 是否开启校验
# 迁移时是否校验,默认为 true
validate-on-migrate: true
# 默认脚本加载路径:/db/migration
# locations: ["classpath:/db/migration"]
# flyway 的 clean 命令会删除指定 schema 下的所有 table,默认 false
clean-disabled: false
# 发环境最好开启 outOfOrder, 生产环境关闭 outOfOrder
# 是否允许无序的迁移,默认 false
out-of-order: false
# 检查迁移脚本的位置是否存在,默认false
check-location: false
# 当读取元数据表时是否忽略错误的迁移,默认false
ignore-future-migrations: false
# 当初始化好连接时要执行的SQL
init-sqls: show tables;
# 迁移时使用的JDBC URL,如果没有指定的话,将使用配置的主数据源
# url:
# 迁移数据库的用户名
# user:
# 目标数据库的密码
# password:
# 设置每个placeholder的前缀,默认${
#placeholder-prefix:
# 是否要被替换,默认true
#placeholder-replacement:
# 设置每个placeholder的后缀,默认}
#placeholder-suffix:
# 设置placeholder的value
#placeholders.[placeholder name]
# 设定需要flywary迁移的schema,大小写敏感,默认为连接默认的schema
#schemas: flyway
# 迁移文件的前缀,默认为V
#sql-migration-prefix:
# 迁移脚本的文件名分隔符,默认__
#sql-migration-separator:
# 迁移脚本的后缀,默认为.sql
#sql-migration-suffix:
# 使用的元数据表名,默认为schema_version
#table: flyway_schema_history
# 迁移时使用的目标版本,默认为latest version
#target:

flyway下没有配置url、user、password的话将会使用springboot的数据源。

3、在db.migration包下新建V1__initialization_table.sql文件。

1
2
3
4
5
6
DROP TABLE IF EXISTS user;
CREATE TABLE `user` (
`id` bigint(20) unsigned zerofill NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`user_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '姓名',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';

flyway脚本默认放在classpath:/db/migration目录下面,如果想换位置,可以自定义一个位置spring.flyway.locations即为脚本存放的位置。

4、开始验证

目前只有test库中没有任何表:

JEo9XF

我们开始启动项目看看:

NQVITI

通过日志信息,我们可以看到已经迁移成功了,我们去数据库看看。

PI8774

表结构:

UTMMNl

现在我们来给user表新增一个字段试试。

V1.1__addField_col.sql

1
alter table user add mail varchar(128) comment '用户邮箱';

运行项目看看

hIlzSt

表结构:

IDeE9S

SpringBoot优雅的参数校验-validation

发表于 2020-07-25 更新于 2020-07-26 分类于 spring boot

SpringBoot优雅的参数校验-validation

前言

    在后端开发过程中,我们避免不了对前端传过来的参数进行检验。有的人会说,前端检验不就行了吗,为啥还要后端检验一遍?我们前端经常对我说的一句话:”后端不要信任前端,你怎么知道接口不会被拦截,前端就一定不会传错。“。因此,参数校验是非常重要的一个环节,严格参数校验会减少很多出bug的概率,增加接口的安全性,增强程序的健壮性。

    当我们在做参数检验的时候,可能会出现这种情况:参数太多,我们要写很多条件判断语句,这样就会显得代码不够整洁,阅读体验也不是很好。
SKO0yt
    因此我们需要一种优雅的方式来处理上面的问题,进行SpringBoot统一参数校验。那就是今天我们的主角validation啦。

使用方式

    通过@Validated这一注解配合一些参数校验注解(PS:@NotNull,@NotEmpty)。然后对抛出的异常进行全局统一捕获然后返回错误信息。

引入依赖

1
2
3
4
5
<!-- Validation -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>

PostParams

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.example.spring_boot_validation.entity.params;

import lombok.Data;

import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.List;

/**
* @Author: Roger
* @description: post参数类
* @date: 2020/7/25 4:31 下午
*/
@Data
public class PostParams {
/**
* ID
*/
@NotNull(message = "ID不能为空")
private Integer id;
/**
* 名称
*/
@NotNull(message = "名称不能为空")
private String name;
/**
* 数组
*/
@NotEmpty(message = "数组里面至少有一个元素")
private List<Integer> array;
/**
* 对象
*/
@Valid
@NotNull(message = "item不能为空")
private Item item;
}

TestController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@RestController
@Validated
public class TestController {
@GetMapping("/get-test")
public ResponseEntity<Object> getTest(@RequestParam(required = false) @NotNull(message = "offset不能为空") Integer offset,
@RequestParam(required = false) @NotNull(message = "limit不能为空") Integer limit){
return new ResponseEntity<>("ok", HttpStatus.OK);
}

@PostMapping("/post-test")
public ResponseEntity<Object> postTest(@Valid @RequestBody PostParams postParams){
return new ResponseEntity<>(HttpStatus.CREATED);
}

@GetMapping("/get-test2")
public ResponseEntity<Object> getTest2(@RequestParam(required = false) @NotNull(message = "offset不能为空") Integer offset,
@RequestParam(required = false) Integer limit){
return new ResponseEntity<>("ok", HttpStatus.OK);
}

@GetMapping("/get-test3")
public ResponseEntity<Object> getTest3(@Valid Item item){
return new ResponseEntity<>("ok", HttpStatus.OK);
}

@GetMapping("/get-test4")
public ResponseEntity<Object> getTest4(@NotNull(message = "offset不能为空") Integer offset,
Integer limit){
return new ResponseEntity<>("ok", HttpStatus.OK);
}
}

ControllerAdvice

参数检验异常统一拦截处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@RestControllerAdvice
public class ControllerAdvice {

//参数检验错误 @RequestParam上validate失败后抛出的异常是javax.validation.ConstraintViolationException
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ExceptionHandler
public ResultVo<Object> handlerMethodArgumentNotValidException(final MethodArgumentNotValidException e) {
List<ObjectError> objectErrors = e.getBindingResult().getAllErrors();
StringBuilder errorMessages = new StringBuilder();
objectErrors.forEach(objectError -> errorMessages.append(objectError.getDefaultMessage()).append(";"));
return new ResultVo<>(0,String.valueOf(errorMessages),null);
}

//参数检验错误 @RequestBody上validate失败后抛出的异常是MethodArgumentNotValidException异常。
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ExceptionHandler
public ResultVo<Object> handlerConstraintViolationException (final ConstraintViolationException e) {
String errorMessages = e.getConstraintViolations().stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));
return new ResultVo<>(0,String.valueOf(errorMessages),null);
}

//参数检验错误 validate失败后抛出的异常是BindException异常。
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ExceptionHandler
public ResultVo<Object> handlerConstraintViolationException (final BindException e) {
String errorMessages = e.getBindingResult().getAllErrors().stream().map(ObjectError::getDefaultMessage).collect(Collectors.joining(";"));
return new ResultVo<>(0,String.valueOf(errorMessages),null);
}
}

​ 在使用参数检验的过程中,主要会抛出以上三种异常BindException、MethodArgumentNotValidException、ConstraintViolationException。原因主要是因为跟请求发起的数据格式(content-type)有关系,对于不同的传输数据的格式spring采用不同的HttpMessageConverter(http参数转换器)来进行处理。

​ 请求体(@RequestBody)绑定到java bean上失败时抛出MethodArgumentNotValidException;普通参数(@RequestParam)(非 java bean)校验出错时抛出ConstraintViolationException;请求参数绑定到java bean上失败时抛出BindException;

测试

请求POST: http://localhost:8080/post-test ,结果如下

QwBokF

常用的一些注解解析

@Validated 和 @Valid 的异同
注解 范围 嵌套 校验组
@Validated 可以标记类、方法、方法参数,不能用在成员属性(字段)上 不支持 支持
@Valid 可以标记方法、构造函数、方法参数和成员属性(字段)上 支持 不支持

通常在使用过程中,我们把@Validated标记在类上,然后@Valid标记在实体中的属性上。在Controller中使用,把@Validated标记在类上。然后针对java bean的参数就用@Valid注解。如下所示:

0LcqcD

校验注解一览表

注解 作用
@Valid 被注释的元素是一个对象,需要检查此对象的所有字段值
@Null 被注释的元素必须为 null
@NotNull 被注释的元素必须不为 null
@AssertTrue 被注释的元素必须为 true
@AssertFalse 被注释的元素必须为 false
@Min(value) 被注释的元素必须是一个数字,其值必须大于等于指定的最小值
@Max(value) 被注释的元素必须是一个数字,其值必须小于等于指定的最大值
@DecimalMin(value) 被注释的元素必须是一个数字,其值必须大于等于指定的最小值
@DecimalMax(value) 被注释的元素必须是一个数字,其值必须小于等于指定的最大值
@Size(max, min) 被注释的元素的大小必须在指定的范围内
@Digits (integer, fraction) 被注释的元素必须是一个数字,其值必须在可接受的范围内
@Past 被注释的元素必须是一个过去的日期
@Future 被注释的元素必须是一个将来的日期
@Pattern(value) 被注释的元素必须符合指定的正则表达式
@Email 被注释的元素必须是电子邮箱地址
@Length(min=, max=) 被注释的字符串的大小必须在指定的范围内
@NotEmpty 被注释的字符串的必须非空
@Range(min=, max=) 被注释的元素必须在合适的范围内
@NotBlank 被注释的字符串的必须非空
@URL(protocol=,host=, port=,regexp=, flags=) 被注释的字符串必须是一个有效的url

嵌套验证

我们很多时候会存在这样的业务场景,前端会给后端传递一个list,我们不仅要限制每次请求list内的个数,同时还要对list内基本元素的属性值进行校验。这个时候就需要进行嵌套验证了,实现的方式很简单。在list上添加@Vaild就可以实现了。

UqPx8n

12
Roger

Roger

17 日志
9 分类
18 标签
渝ICP备19010391号-1 © 2021 Roger
由 Hexo 强力驱动 v3.9.0
|
主题 – NexT.Pisces v7.2.0