文章目录:
- 1、spark thrift server 与 网易 kyuubi thrift server
- 2、可能是全网最详细的 Spark Sql Aggregate 源码剖析
- 3、怎么用Eclipse搭建Spark源码阅读环境
- 4、spark源码二次开发难吗
spark thrift server 与 网易 kyuubi thrift server
thrift server可以实现通过jdbc, beeline等工具,实现连接到spark集群,并提交sql查询的机制。
默认情况下,cdh安装的spark没有包含thrift server模块,因此我们需要重新编译spark。
另外,为了不影响cdh自带的spark,而且spark目前都是基于yarn运行的,本身也没有什么独立的服务部署(除了history sever)。
所以,在一个集群中,可以部署安装多个版本的spark。
我们使用源码编译的spark 2.4.0(其中hive的版本是1.2.1)
cdh集成的spark版本和Hive版本如下:
使用jdk1.8
修改spark提供的mvn,使用自行安装的maven 3.8.1
使用make-distribution.sh可以帮助与我们编译之后打包成tgz文件
修改pom.xml文件的配置如下。
最后,执行编译命令如下:
这样打出的包,就含有thrift server的jar包了。
最终打包文件,根目录下。
之后就是解压到其他目录下后即可。
将hive-site.xml的文件连接过来,这样spark就可以读取hive的表了。
为了确保spark提交到yarn上运行,需要配置
cp spark-defaults.conf.template spar-defaults.conf
另外,可以在spark-env.sh中设置环境变量。
HADOOP_CONF_DIR
环境变量,也可以在/etc/profile中设置
启动日志可以查看,注意下端口占用问题,如下。
启动时候,使用beeline工具连接上,主要这里不用使用cdh默认安装hive提供的beeline工具,应为版本太高。
使用编译后spark生成beeline工具
参考beeline使用教程。
kyuubi是基于thrift sever二次开发,在系能和安全上优于thrift server。
鉴于目前hive的版本是2.1,而最新的kyuubi的hive是2.3,所以采用前天版本的kyuubi,采用0.7版本,保证hive的版本小于当前集群中的hive版本。
使用build目录下的dist脚本进行编译和打包。
编译成功后,会在更目录下出现tar.gz的压缩文件,如上图。
之后解压到目录下。
配置bin/kyuubi-env.sh脚本,设置spark路径
执行bin/start-kyuubi.sh命令即可。
访问的方式同样采用beelin,注意使用上面章节的beeline工具。
访问后,可以通过beeline访问到hive的表(在spark中已经配置了hive-site.xml)
!connect jdbc: hive2://xxxx:10009 即可。
可能是全网最详细的 Spark Sql Aggregate 源码剖析
纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。
本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的
创建聚合分为两个阶段:
AggregateExpression 共有以下几种 mode:
Q:是否支持使用 hash based agg 是如何判断的?
摘自我另一篇文章:
为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为
这样就能进入 HashAggregateExec 的分支
构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化
在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:
可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。
此迭代器:
注:UnsafeKVExternalSorter 的实现可以参考:
UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:
使用 UnsafeRow 的收益:
构造函数的主要流程已在上图中说明,需要注意的是:当内存不足时(毕竟每个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来看看最关键的 processInputs 方法的实现
上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:
上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类
AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。
DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:
我们再次以容易理解的 Count 来举例说明:
通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:
再来看看 AggregationIterator#processRow
AggregationIterator#processRow 会调用
生成用于处理一行数据(row)的函数
说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:
比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:
对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:
对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:
对于 Final 的 Collect 来说就是调用其 merge 方法,即:
我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。
分为两种情况,分别是:
Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~
怎么用Eclipse搭建Spark源码阅读环境
第一部分、软件安装
1、 安装JDK (版本为1.7.0_11)
2、 安装Scala (版本为2.11.2)
3、 安装ScalaIDE(版本为3.0.4)
第二部分:加压缩官网下载的源代码包或者找到通过Git抽取的Spark源文件:
我用的是spark-1.1.1版本(最新版本),由于idea 13已经原生支持sbt,所以无须为idea安装sbt插件。
源码下载(用git工具):
# Masterdevelopment branch
gitclone git://github.com/apache/spark.git
# 1.1 maintenancebranch with stability fixes on top of Spark 1.1.1
gitclone git://github.com/apache/spark.git -b branch-1.1
源码更新(用git工具同步跟新源码):
gitclone
第三部分:通过sbt工具,构建Scala的Eclipse工程,详细步骤如下所示
1、通过cmd命令进入DOS界面,之后通过cd命令进入源代码项目中,我下载的Spark.1.1.1版本的源代码放在(E:\Spark计算框架的研究\spark_1_1_1_eclipse)文件夹中,之后运行sbt命令,如下所示:
2、运行sbt命令之后,解析编译相关的jar包,并出现sbt命令界面窗口,出现的效果图如下所示,之后运行eclipse命令,sbt对这个工程进行编译,构建Eclipse项目,效果图如下所示:
4、 打开ScalaIDE工具,File à Import à Existing Projects into Workspace à
Next à
选择刚好用sbt工具编译好的Eclipse工程(E:\Spark计算框架的研究\spark_1_1_1_eclipse),如下图所示。
5、 通过上面的操作,就可以将通过sbt工具编译生成的Eclipse项目导入到EclipseIDE开发环境中,效果图如下所示:
错误提示如下所示:我导入的包为,如下文件夹中所示。
(E:\Spark计算框架的研究\spark_1_1_1_eclipse\lib_managed\bundles)
Description Resource Path Location Type
akka-remote_2.10-2.2.3-shaded-protobuf.jar is cross-compiled
with an incompatible version of Scala (2.10).
In case of errorneous report, this check can be disabled
in the compiler preference page.
spark-core Unknown Scala Classpath Problem
Description Resource Path Location Type
akka-slf4j_2.10-2.2.3-shaded-protobuf.jar is cross-compiled with
an incompatible version of Scala (2.10). In case of errorneous report,
this check can be disabled in the compiler preference page.
spark-core Unknown Scala Classpath Problem
Description Resource Path Location Type
akka-testkit_2.10-2.2.3-shaded-protobuf.jar is cross-compiled
with an incompatible version of Scala (2.10).
In case of errorneous report, this check can be disabled in the compiler preference page.
spark-core Unknown Scala Classpath Problem
Description Resource Path Location Type
akka-zeromq_2.10-2.2.3-shaded-protobuf.jar is cross-compiled
with an incompatible version of Scala (2.10).
In case of errorneous report, this check can be disabled in the compiler preference page.
spark-core Unknown Scala Classpath Problem
上面这些包兼容性问题还没有解决,修改相应的jar包就可以解决。
spark源码二次开发难吗
spark源码二次开发不难。掌握了源码编译,就具备了对Spark进行二次开发的基本条件了,要修改Spark源码,进行二次开发,那么就得从官网下载指定版本的源码,导入ide开发环境,进行源码的修改。接着修改完了。
e 函数包含以下几个重要的组成部分: 再来看看 AggregationIterator#processRow AggregationIterator#pr
用 hash based agg 是如何判断的? 摘自我另一篇文章: 为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为 这样就能进入 HashAggregateExec 的分支 构
eclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。 DeclarativeAggregate 是一类直接由 Cataly