当前位置:网站首页>SPARK中的FileSourceStrategy,DataSourceStrategy以及DataSourceV2Strategy
SPARK中的FileSourceStrategy,DataSourceStrategy以及DataSourceV2Strategy
2022-07-17 07:17:00 【鸿乃江边鸟】
背景
本文基于SPARK 3.3.0
用于记录Spark中V1和V2的Datasource以及FileSource的作用以及区别,以及在Spark 3.3.0出现的更强的Datasource v2 JDBC的下推
分析
在spark 3.3.0中 出现了DS V2 push down的功能,该功能是能够更好的进行下推,比如说更加复杂的聚合下推和过滤下推。
- v1中的DataSource和FileSource下推
这里就得提及到V1中的 DataSourceStrategy 和 FileSourceStrategy 这两个Rule。
其中FileSourceStrategy主要针对的是hdfs文件,比如说是hive表的转换,具体可见这里,
会把相应的LogicalRelation 转换为FileSourceScanExec
FileSourceScanExec(
fsRelation,
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
bucketSet,
None,
dataFilters,
table.map(_.identifier))
涉及到filter过滤的地方为partitionKeyFilters 和dataFilters,partitionKeyFilters针对的是分区级别的过滤,比如说只选择某个个分区,或者动态分区裁剪涉及的分区,dataFilters涉及到非分区的列的过滤,这样在读取文件的时候,就会进行对应的过滤,如下:
@transient lazy val selectedPartitions: Array[PartitionDirectory] = {
...
@transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = {
...
lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
val readRDD = if (bucketedScan) {
createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
relation)
} else {
createReadRDD(readFile, dynamicallySelectedPartitions, relation)
}
sendDriverMetrics()
readRDD
}
这样在读取文件的时候,就只会读取对应的分区,从而减少IO,
而 dataFilters的用处就在于
private lazy val pushedDownFilters = {
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
// `dataFilters` should not include any metadata col filters
// because the metadata struct has been flatted in FileSourceStrategy
// and thus metadata col filters are invalid to be pushed down
dataFilters.filterNot(_.references.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
}).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
}
pushedDownFilters 会在具体读取数据的时候,进行过滤。对于不同的FileFormat有不同的处理方式
对于DataSourceStrategy,是处理使用source api的定义的Data Source,会把对应的LogicalRelation转换为RowDataSourceScanExec:
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
case ScanOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
...
其中pruneFilterProjectRaw有方法:
val candidatePredicates = filterPredicates.map { _ transform {
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
}}
val (unhandledPredicates, pushedFilters, handledFilters) =
selectFilters(relation.relation, candidatePredicates)
selectFilters 就会把对应的Catalyst的Filters转换为data source Filters,
val scan = RowDataSourceScanExec(
requestedColumns,
requestedColumns.toStructType,
pushedFilters.toSet,
handledFilters,
PushedDownOperators(None, None, None, Seq.empty, Seq.empty),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
这样pushedFilters就会传入scanBuilder作为scanBuilder的参数进行datasource级别的过滤
- v2 中的 Datasource 下推
目前从实现来看,只支持JDBC类型的下推,这里的实现和V1的不一样,DataSourceV2Strategy 只是做的物理计划的转换,对于下推操作是在优化rule V2ScanRelationPushDown中完成的:
object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper with AliasHelper {
import DataSourceV2Implicits._
def apply(plan: LogicalPlan): LogicalPlan = {
val pushdownRules = Seq[LogicalPlan => LogicalPlan] (
createScanBuilder,
pushDownSample,
pushDownFilters,
pushDownAggregates,
pushDownLimits,
pruneColumns)
pushdownRules.foldLeft(plan) { (newPlan, pushDownRule) =>
pushDownRule(newPlan)
}
}
这里面就涉及到了filter和aggregate等下推的判断,
其中createScanBuilder会创建一个ScanBuilderHolder,在pushDownFilters中会调用pushFilters方法,从而调用scanBuilder的pushPredicates方法从而把需要下推的谓词给记录下来,如JDBCScanBuilder
override def build(): Scan = {
val resolver = session.sessionState.conf.resolver
val timeZoneId = session.sessionState.conf.sessionLocalTimeZone
val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
// the `finalSchema` is either pruned in pushAggregation (if aggregates are
// pushed down), or pruned in pruneColumns (in regular column pruning). These
// two are mutual exclusive.
// For aggregate push down case, we want to pass down the quoted column lists such as
// "DEPT","NAME",MAX("SALARY"),MIN("BONUS"), instead of getting column names from
// prunedSchema and quote them (will become "MAX(SALARY)", "MIN(BONUS)" and can't
// be used in sql string.
JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), finalSchema, pushedPredicate,
pushedAggregateList, pushedGroupBys, tableSample, pushedLimit, sortOrders)
}
在pruneColumns方法中会进行scan的构建, 这样在DataSourceV2Strategy规则的时候就会使用该scan构建对应的RDD:
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, DataSourceV2ScanRelation(
_, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _)) =>
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
if (v1Relation.schema != scan.readSchema()) {
throw QueryExecutionErrors.fallbackV1RelationReportsInconsistentSchemaError(
scan.readSchema(), v1Relation.schema)
}
val rdd = v1Relation.buildScan()
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
val dsScan = RowDataSourceScanExec(
output,
output.toStructType,
Set.empty,
pushed.toSet,
pushedDownOperators,
unsafeRowRDD,
v1Relation,
tableIdentifier = None)
withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false) :: Nil
....
case class JDBCScan(
relation: JDBCRelation,
prunedSchema: StructType,
pushedPredicates: Array[Predicate],
pushedAggregateColumn: Array[String] = Array(),
groupByColumns: Option[Array[String]],
tableSample: Option[TableSampleInfo],
pushedLimit: Int,
sortOrders: Array[SortOrder]) extends V1Scan {
override def readSchema(): StructType = prunedSchema
override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = {
new BaseRelation with TableScan {
override def sqlContext: SQLContext = context
override def schema: StructType = prunedSchema
override def needConversion: Boolean = relation.needConversion
override def buildScan(): RDD[Row] = {
val columnList = if (groupByColumns.isEmpty) {
prunedSchema.map(_.name).toArray
} else {
pushedAggregateColumn
}
relation.buildScan(columnList, prunedSchema, pushedPredicates, groupByColumns, tableSample,
pushedLimit, sortOrders)
}
}.asInstanceOf[T]
}
看到对于JDBC的谓词下推,是调用toV1TableScan方法tableScan构建,之后再调用buildScan构建RDD,最后再构建RowDataSourceScanExec 物理计划,这样就完成了V2 Datasource的下推。
v1 v2 Datasource下推的区别
V1中的下推在构建对应的scan 物理计划的时候一并生成的,而在V2中是单独在V2ScanRelationPushDown规则中进行构建,而在物理计划生成阶段只是调用生成RDD的方法。
边栏推荐
- leetcode:287. 寻找重复数【快慢指针板子】
- How did "leek" give money to "sickle"? 2020-03-07
- Not so large number of combinations
- The connection between neural network and automatic control
- No module named ‘yaml‘ 解决办法
- 美联储降息,为何长期利好数字货币市场? 2020-03-05
- Seaport 以及 ERC-4907 能否成为释放NFT流动性的新途径?| Tokenview
- 【flask入门系列】异常处理
- Oi memoirs
- 地址监控API:如何追溯与监控Uniswap黑客地址
猜你喜欢
随机推荐
Jira --- workflow call external api
Unity: WebGL发布后在浏览器上运行时窗口大小自适应
Shenzhen Prudential written examination record
redis数据持久化
By voting for the destruction of STI by Dao, seektiger is truly community driven
Detailed explanation of type, user-defined type, preliminary understanding of structure
[C language] user defined type details: structure, enumeration, union
深度学习之 7 深度前馈网络
Csp-2020-6- role authorization
3D激光SLAM:ALOAM---帧间里程计代码解读
美联储降息,为何长期利好数字货币市场? 2020-03-05
A small program about daily habit clock in
Redis message subscription
Visual studio production environment configuration scheme: slowcheetah
Yolov5 label and establish your own data set
依赖注入方式
How to convert STR in read list to float
真实案例:系统上线后Cpu使用率飙升如何排查?
MySQL 2502 2503 error
[C# 变量常量关键字]- C# 中的变量常量以及关键字









