当前位置:网站首页>Filesourcestrategy, datasourcestrategy and datasourcev2strategy in spark
Filesourcestrategy, datasourcestrategy and datasourcev2strategy in spark
2022-07-19 08:20:00 【Hongnai riverside bird】
background
This article is based on SPARK 3.3.0
Used to record Spark in V1 and V2 Of Datasource as well as FileSource The role and the difference of , And in Spark 3.3.0 Appear stronger Datasource v2 JDBC Push down
analysis
stay spark 3.3.0 in There is DS V2 push down The function of , This function can push down better , For example, more complex aggregation push down and filter push down .
- v1 Medium DataSource and FileSource Push down
Here we have to mention V1 Medium DataSourceStrategy and FileSourceStrategy these two items. Rule.
among FileSourceStrategy The main aim is hdfs file , For instance, hive Conversion of tables , concrete here ,
I'm going to put the corresponding LogicalRelation Convert to FileSourceScanExec
FileSourceScanExec(
fsRelation,
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
bucketSet,
None,
dataFilters,
table.map(_.identifier))
involves filter The filtering place is partitionKeyFilters and dataFilters,partitionKeyFilters It is aimed at partition level filtering , For example, select only certain partitions , Or the partition involved in dynamic partition clipping ,dataFilters Filtering involving non partitioned columns , So when reading files , The corresponding filtering will be carried out , as follows :
@transient lazy val selectedPartitions: Array[PartitionDirectory] = {
...
@transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = {
...
lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
val readRDD = if (bucketedScan) {
createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
relation)
} else {
createReadRDD(readFile, dynamicallySelectedPartitions, relation)
}
sendDriverMetrics()
readRDD
}
So when reading files , Only the corresponding partition will be read , Thereby reducing IO,
and dataFilters The use of is
private lazy val pushedDownFilters = {
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
// `dataFilters` should not include any metadata col filters
// because the metadata struct has been flatted in FileSourceStrategy
// and thus metadata col filters are invalid to be pushed down
dataFilters.filterNot(_.references.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
}).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
}
pushedDownFilters When specifically reading data , To filter . For different FileFormat There are different ways to deal with it
about DataSourceStrategy, It is processing and using source api Defined by Data Source, I'll put the corresponding LogicalRelation Convert to RowDataSourceScanExec:
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
case ScanOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
...
among pruneFilterProjectRaw There are ways to :
val candidatePredicates = filterPredicates.map { _ transform {
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
}}
val (unhandledPredicates, pushedFilters, handledFilters) =
selectFilters(relation.relation, candidatePredicates)
selectFilters I'll take the corresponding Catalyst Of Filters Convert to data source Filters,
val scan = RowDataSourceScanExec(
requestedColumns,
requestedColumns.toStructType,
pushedFilters.toSet,
handledFilters,
PushedDownOperators(None, None, None, Seq.empty, Seq.empty),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
such pushedFilters Will pass in scanBuilder As scanBuilder The parameters of datasource Level filtering
- v2 Medium Datasource Push down
At present, in terms of implementation , Only support JDBC Push down of type , The implementation and V1 Is not the same ,DataSourceV2Strategy Just do the conversion of physical plan , The push down operation is optimized rule V2ScanRelationPushDown Done in :
object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper with AliasHelper {
import DataSourceV2Implicits._
def apply(plan: LogicalPlan): LogicalPlan = {
val pushdownRules = Seq[LogicalPlan => LogicalPlan] (
createScanBuilder,
pushDownSample,
pushDownFilters,
pushDownAggregates,
pushDownLimits,
pruneColumns)
pushdownRules.foldLeft(plan) { (newPlan, pushDownRule) =>
pushDownRule(newPlan)
}
}
This involves filter and aggregate Wait for the downward judgment ,
among createScanBuilder Will create a ScanBuilderHolder, stay pushDownFilters Will call pushFilters Method , To call scanBuilder Of pushPredicates Method to record the predicates that need to be pushed down , Such as JDBCScanBuilder
override def build(): Scan = {
val resolver = session.sessionState.conf.resolver
val timeZoneId = session.sessionState.conf.sessionLocalTimeZone
val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
// the `finalSchema` is either pruned in pushAggregation (if aggregates are
// pushed down), or pruned in pruneColumns (in regular column pruning). These
// two are mutual exclusive.
// For aggregate push down case, we want to pass down the quoted column lists such as
// "DEPT","NAME",MAX("SALARY"),MIN("BONUS"), instead of getting column names from
// prunedSchema and quote them (will become "MAX(SALARY)", "MIN(BONUS)" and can't
// be used in sql string.
JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), finalSchema, pushedPredicate,
pushedAggregateList, pushedGroupBys, tableSample, pushedLimit, sortOrders)
}
stay pruneColumns Method scan The construction of , In this way DataSourceV2Strategy The rules will be used when scan Build the corresponding RDD:
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, DataSourceV2ScanRelation(
_, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _)) =>
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
if (v1Relation.schema != scan.readSchema()) {
throw QueryExecutionErrors.fallbackV1RelationReportsInconsistentSchemaError(
scan.readSchema(), v1Relation.schema)
}
val rdd = v1Relation.buildScan()
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
val dsScan = RowDataSourceScanExec(
output,
output.toStructType,
Set.empty,
pushed.toSet,
pushedDownOperators,
unsafeRowRDD,
v1Relation,
tableIdentifier = None)
withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false) :: Nil
....
case class JDBCScan(
relation: JDBCRelation,
prunedSchema: StructType,
pushedPredicates: Array[Predicate],
pushedAggregateColumn: Array[String] = Array(),
groupByColumns: Option[Array[String]],
tableSample: Option[TableSampleInfo],
pushedLimit: Int,
sortOrders: Array[SortOrder]) extends V1Scan {
override def readSchema(): StructType = prunedSchema
override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = {
new BaseRelation with TableScan {
override def sqlContext: SQLContext = context
override def schema: StructType = prunedSchema
override def needConversion: Boolean = relation.needConversion
override def buildScan(): RDD[Row] = {
val columnList = if (groupByColumns.isEmpty) {
prunedSchema.map(_.name).toArray
} else {
pushedAggregateColumn
}
relation.buildScan(columnList, prunedSchema, pushedPredicates, groupByColumns, tableSample,
pushedLimit, sortOrders)
}
}.asInstanceOf[T]
}
See for JDBC The predicate of , Is to call toV1TableScan Method tableScan structure , Call later buildScan structure RDD, Finally, build RowDataSourceScanExec Physical plan , That's it V2 Datasource Push down .
v1 v2 Datasource The difference between pushing down
V1 Push down in builds the corresponding scan Generated together with the physical plan , And in the V2 Is alone in V2ScanRelationPushDown Build in rules , In the physical plan generation stage, only the generation is called RDD Methods .
边栏推荐
- 【刷题篇】完全平方数
- Redis cache avalanche, penetration, breakdown
- What if the user information in the website app database is leaked and tampered with
- DP dynamic planning enterprise level template analysis (Digital triangle, rising sequence, knapsack, state machine, compressed DP)
- Jira --- workflow call external api
- 【flask入门系列】异常处理
- 行为型模式之策略模式
- 在VSCode中设置settings.json
- [C# Console]-C# 控制臺類
- Unity custom sky ball model to prevent it from being cropped
猜你喜欢
[characteristic Engineering]

STM32F103C8T6硬件IIC控制4针0.96寸OLED显示屏

Great summary! Finally, someone explained all kinds of SQL join connections clearly

凭借左程云(左神)的这份 “程序员代码面试指南”我入职了字节

Xinlinx zynq7020, 7045 domestic replacement fmql45t900 national production arm core board + expansion board

Will it be a little late to realize your "wonderful" 360?

Redis cache avalanche, penetration, breakdown

Classic general pbootcms flower website template source code, adaptive mobile terminal, with background management

Array exercise 3
![[C# 变量常量关键字]- C# 中的变量常量以及关键字](/img/9b/433f9110d9c7599d8beac8288ea409.png)
[C# 变量常量关键字]- C# 中的变量常量以及关键字
随机推荐
Leetcode daily question 2021/7/11-2021/7/17
Array exercise 3
How did "leek" give money to "sickle"? 2020-03-07
C#对txt文件的读写操作
Shenzhen Prudential written examination record
TextView文字上下移动
Junit5
STM32F103C8T6硬件IIC控制4针0.96寸OLED显示屏
Textview text up and down
History and value of forked coins | eth, BCH, BSV 2020-03-08
Do online usdt and usdc want to be short? Take you to find out | tokenview
Real case: how to check the soaring usage of CPU after the system goes online?
visual studio 2022(VS 2022)无法读取内存的问题
关于快慢指针的理解
Standard Version (release and changelog Automation)
Discussion on risc-v Technology
Why does the Fed cut interest rates benefit the digital money market in the long run? 2020-03-05
MySQL 2502 2503 error
redis数据持久化
DP动态规划企业级模板分析(数字三角,上升序列,背包,状态机,压缩DP)