spark源码的编译_spark reducebykey 源码

hacker|
140

文章目录:

spark thrift server 与 网易 kyuubi thrift server

thrift server可以实现通过jdbc, beeline等工具,实现连接到spark集群,并提交sql查询的机制。

默认情况下,cdh安装的spark没有包含thrift server模块,因此我们需要重新编译spark。

另外,为了不影响cdh自带的spark,而且spark目前都是基于yarn运行的,本身也没有什么独立的服务部署(除了history sever)。

所以,在一个集群中,可以部署安装多个版本的spark。

我们使用源码编译的spark 2.4.0(其中hive的版本是1.2.1)

cdh集成的spark版本和Hive版本如下:

使用jdk1.8

修改spark提供的mvn,使用自行安装的maven 3.8.1

使用make-distribution.sh可以帮助与我们编译之后打包成tgz文件

修改pom.xml文件的配置如下。

最后,执行编译命令如下:

这样打出的包,就含有thrift server的jar包了。

最终打包文件,根目录下。

之后就是解压到其他目录下后即可。

将hive-site.xml的文件连接过来,这样spark就可以读取hive的表了。

为了确保spark提交到yarn上运行,需要配置

cp spark-defaults.conf.template spar-defaults.conf

另外,可以在spark-env.sh中设置环境变量。

HADOOP_CONF_DIR

环境变量,也可以在/etc/profile中设置

启动日志可以查看,注意下端口占用问题,如下。

启动时候,使用beeline工具连接上,主要这里不用使用cdh默认安装hive提供的beeline工具,应为版本太高。

使用编译后spark生成beeline工具

参考beeline使用教程。

kyuubi是基于thrift sever二次开发,在系能和安全上优于thrift server。

鉴于目前hive的版本是2.1,而最新的kyuubi的hive是2.3,所以采用前天版本的kyuubi,采用0.7版本,保证hive的版本小于当前集群中的hive版本。

使用build目录下的dist脚本进行编译和打包。

编译成功后,会在更目录下出现tar.gz的压缩文件,如上图。

之后解压到目录下。

配置bin/kyuubi-env.sh脚本,设置spark路径

执行bin/start-kyuubi.sh命令即可。

访问的方式同样采用beelin,注意使用上面章节的beeline工具。

访问后,可以通过beeline访问到hive的表(在spark中已经配置了hive-site.xml)

!connect jdbc: hive2://xxxx:10009 即可。

可能是全网最详细的 Spark Sql Aggregate 源码剖析

纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。

本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的

创建聚合分为两个阶段:

AggregateExpression 共有以下几种 mode:

Q:是否支持使用 hash based agg 是如何判断的?

摘自我另一篇文章:

为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为

这样就能进入 HashAggregateExec 的分支

构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化

在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:

可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。

此迭代器:

注:UnsafeKVExternalSorter 的实现可以参考:

UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:

使用 UnsafeRow 的收益:

构造函数的主要流程已在上图中说明,需要注意的是:当内存不足时(毕竟每个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来看看最关键的 processInputs 方法的实现

上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:

上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类

AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。

DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:

我们再次以容易理解的 Count 来举例说明:

通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:

再来看看 AggregationIterator#processRow

AggregationIterator#processRow 会调用

生成用于处理一行数据(row)的函数

说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:

比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:

对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:

对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:

对于 Final 的 Collect 来说就是调用其 merge 方法,即:

我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。

分为两种情况,分别是:

Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~

怎么用Eclipse搭建Spark源码阅读环境

第一部分、软件安装

1、 安装JDK (版本为1.7.0_11)

2、 安装Scala (版本为2.11.2)

3、 安装ScalaIDE(版本为3.0.4)

第二部分:加压缩官网下载的源代码包或者找到通过Git抽取的Spark源文件:

我用的是spark-1.1.1版本(最新版本),由于idea 13已经原生支持sbt,所以无须为idea安装sbt插件。

源码下载(用git工具):

# Masterdevelopment branch

gitclone git://github.com/apache/spark.git

# 1.1 maintenancebranch with stability fixes on top of Spark 1.1.1

gitclone git://github.com/apache/spark.git -b branch-1.1

源码更新(用git工具同步跟新源码):

gitclone

第三部分:通过sbt工具,构建Scala的Eclipse工程,详细步骤如下所示

1、通过cmd命令进入DOS界面,之后通过cd命令进入源代码项目中,我下载的Spark.1.1.1版本的源代码放在(E:\Spark计算框架的研究\spark_1_1_1_eclipse)文件夹中,之后运行sbt命令,如下所示:

2、运行sbt命令之后,解析编译相关的jar包,并出现sbt命令界面窗口,出现的效果图如下所示,之后运行eclipse命令,sbt对这个工程进行编译,构建Eclipse项目,效果图如下所示:

4、 打开ScalaIDE工具,File à Import à Existing Projects into Workspace à

Next à

选择刚好用sbt工具编译好的Eclipse工程(E:\Spark计算框架的研究\spark_1_1_1_eclipse),如下图所示。

5、 通过上面的操作,就可以将通过sbt工具编译生成的Eclipse项目导入到EclipseIDE开发环境中,效果图如下所示:

错误提示如下所示:我导入的包为,如下文件夹中所示。

(E:\Spark计算框架的研究\spark_1_1_1_eclipse\lib_managed\bundles)

Description Resource Path Location Type

akka-remote_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled

in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-slf4j_2.10-2.2.3-shaded-protobuf.jar is cross-compiled with

an incompatible version of Scala (2.10). In case of errorneous report,

this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-testkit_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

Description Resource Path Location Type

akka-zeromq_2.10-2.2.3-shaded-protobuf.jar is cross-compiled

with an incompatible version of Scala (2.10).

In case of errorneous report, this check can be disabled in the compiler preference page.

spark-core Unknown Scala Classpath Problem

上面这些包兼容性问题还没有解决,修改相应的jar包就可以解决。

spark源码二次开发难吗

spark源码二次开发不难。掌握了源码编译,就具备了对Spark进行二次开发的基本条件了,要修改Spark源码,进行二次开发,那么就得从官网下载指定版本的源码,导入ide开发环境,进行源码的修改。接着修改完了。

3条大神的评论

  • avatar
    访客 2023-03-28 下午 12:30:25

    e 函数包含以下几个重要的组成部分: 再来看看 AggregationIterator#processRow AggregationIterator#pr

  • avatar
    访客 2023-03-28 上午 05:48:44

    用 hash based agg 是如何判断的? 摘自我另一篇文章: 为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为 这样就能进入 HashAggregateExec 的分支 构

  • avatar
    访客 2023-03-28 上午 05:19:36

    eclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。 DeclarativeAggregate 是一类直接由 Cataly

发表评论