当前位置:网站首页>SPARK闲杂--为什么复用Exchange和subquery
SPARK闲杂--为什么复用Exchange和subquery
2022-07-17 07:17:00 【鸿乃江边鸟】
背景
本文基于Spark 3.3.0
我们在Spark代码中有时候会看到 exchangeReuseEnabled 和subqueryReuseEnabled 配置,这个配置的作用是什么,结合spark源码我们分析一下
分析
exchangeReuseEnabled
在PlanDynamicPruningFilters中我们可以看到:
val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
plan.exists {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
left.sameResult(sparkPlan)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
right.sameResult(sparkPlan)
case _ => false
}
if (canReuseExchange) {
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
val mode = broadcastMode(buildKeys, executedPlan.output)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, executedPlan)
val name = s"dynamicpruning#${exprId.id}"
// place the broadcast adaptor for reusing the broadcast results on the probe side
val broadcastValues =
SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
只有开启了重用Exchange的情况下且存在物理计划是BroadcastHashJoinExec的情况下(只有这样才能获取最大的收益),才会进行动态分区裁剪。其实在这里并没有做过多的操作,只不过是生成了一个BroadcastExchangeExec的操作,看到这里完全没看出来重用Exchange的作用在哪里。
原因是在 Rule ReuseExchangeAndSubquery中,这里会进行exchange的替换,如果存在一样的Exchange,就会进行替换,所以以上分区裁剪中的DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))涉及的BroadcastExchangeExec才会被复用。
但是为什么BroadcastExchangeExec复用了就会减少spark的计算呢?
还是拿DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) 举例:
InSubqueryExec 中 broadcastValues的类型是SubqueryBroadcastExec,而SubqueryBroadcastExec中的计算逻辑:
@transient
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(session, executionId) {
val beforeCollect = System.nanoTime()
val broadcastRelation = child.executeBroadcast[HashedRelation]().value
可以看到这个relationFuture变量是lazy val修饰的(这样多次调动这个变量只会初始化一次,所以会较少driver端的计算量), 而relationFuture这个变量的初始化在:
protected override def doPrepare(): Unit = {
relationFuture
}
而doPrepare方法的调用是在方法executeQuery driver端形成RDD的时候,如下:
protected final def executeQuery[T](query: => T): T = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
waitForSubqueries()
query
}
}
而在val broadcastRelation = child.executeBroadcastHashedRelation.value,这里的child是BroadcastExchangeExec类型的,这样进而触发广播操作。
subqueryReuseEnabled
拿InjectRuntimeFilter举例,在InjectRuntimeFilter的规则中,会最终形成以下逻辑计划:
val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
ListQuery(aggregate, childOutputs = aggregate.output))
Filter(filter, filterApplicationSidePlan)
而InSubquery最终进过PlanSubqueries规则会形成物理计划(在InjectRuntimeFilte中并不会形成InSubqueryExec,我们这里只是举例):
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, query)
InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)
对于InSubqueryExc:
def updateResult(): Unit = {
val rows = plan.executeCollect()
result = if (plan.output.length > 1) {
rows.asInstanceOf[Array[Any]]
} else {
rows.map(_.get(0, child.dataType))
}
if (shouldBroadcast) {
resultBroadcast = plan.session.sparkContext.broadcast(result)
}
}
该updataResult方法也是在executeQuery方法中(也是在drive端调用)被调用,而*plan.executeCollect()*中,plan是BaseSubqueryExec类型的,该类型的实现类中relationFuture变量如下(以SubqueryBroadcastExec为例):
private lazy val relationFuture
也是lazy val,也是只会初始化一次,所以在ReuseExchangeAndSubquery中对BaseSubqueryExec进行复用,就可以减少在driver端的计算.
边栏推荐
- Understanding of fast and slow pointer
- Yolov5 label and establish your own data set
- Kingbasees can realize any of MySQL by constructing an aggregate function_ Value function.
- [C # console] - C # console class
- By voting for the destruction of STI by Dao, seektiger is truly community driven
- Array exercise 3
- 剑指 Offer 42. 连续子数组的最大和-动态规划法
- mySQL 2502 2503错误
- How to check whether the app has user information and data leakage vulnerabilities
- 分叉币的发展史及价值|ETH、BCH、BSV 2020-03-08
猜你喜欢

Redis cluster

Detailed explanation of type, user-defined type, preliminary understanding of structure

依赖注入方式

经典通用的Pbootcms花卉网站模板源码,自适应手机端,带后台管理

通过Dao投票STI的销毁,SeekTiger真正做到由社区驱动

redis集群

STM32F103C8T6硬件IIC控制4针0.96寸OLED显示屏

黑马程序员-软件测试-16阶段3-功能测试-175-198,URL组成介绍,请求内容以及组成说明行功能测试与数据库,url组成扩展说明,客户端与服务器请求与响应,-Fiddler按照以及功能检查确认,
![[kernel] character device that drives development and learning](/img/99/2eaed37078c3245be29d82382cfd59.png)
[kernel] character device that drives development and learning

数据库写入优化:分库分表及相关问题
随机推荐
深度学习之 7 深度前馈网络
Database review -- database recovery technology
Set settings in vscode json
Local storage sessionstorage
How did "leek" give money to "sickle"? 2020-03-07
If a number in C language is exactly equal to the sum of its factors, this number is called "perfect". For example, 6=1 + 2 + 3 programming
V8 引擎如何进行垃圾内存的回收?
Go语言圣经
【特征工程】
[C# Console]-C# 控制臺類
本地存储 sessionStorage
Complete square number
【flask入门系列】异常处理
Yolov5 label and establish your own data set
[kernel] character device that drives development and learning
How does the V8 engine recycle garbage memory?
Seaport 以及 ERC-4907 能否成为释放NFT流动性的新途径?| Tokenview
Is there any cumulative error in serial communication and the requirements for clock accuracy
Visual studio production environment configuration scheme: slowcheetah
Detailed explanation of type, user-defined type, preliminary understanding of structure