当前位置:网站首页>Filesourcestrategy, datasourcestrategy and datasourcev2strategy in spark
Filesourcestrategy, datasourcestrategy and datasourcev2strategy in spark
2022-07-19 08:20:00 【Hongnai riverside bird】
background
This article is based on SPARK 3.3.0
Used to record Spark in V1 and V2 Of Datasource as well as FileSource The role and the difference of , And in Spark 3.3.0 Appear stronger Datasource v2 JDBC Push down
analysis
stay spark 3.3.0 in There is DS V2 push down The function of , This function can push down better , For example, more complex aggregation push down and filter push down .
- v1 Medium DataSource and FileSource Push down
Here we have to mention V1 Medium DataSourceStrategy and FileSourceStrategy these two items. Rule.
among FileSourceStrategy The main aim is hdfs file , For instance, hive Conversion of tables , concrete here ,
I'm going to put the corresponding LogicalRelation Convert to FileSourceScanExec
FileSourceScanExec(
fsRelation,
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
bucketSet,
None,
dataFilters,
table.map(_.identifier))
involves filter The filtering place is partitionKeyFilters and dataFilters,partitionKeyFilters It is aimed at partition level filtering , For example, select only certain partitions , Or the partition involved in dynamic partition clipping ,dataFilters Filtering involving non partitioned columns , So when reading files , The corresponding filtering will be carried out , as follows :
@transient lazy val selectedPartitions: Array[PartitionDirectory] = {
...
@transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = {
...
lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
val readRDD = if (bucketedScan) {
createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
relation)
} else {
createReadRDD(readFile, dynamicallySelectedPartitions, relation)
}
sendDriverMetrics()
readRDD
}
So when reading files , Only the corresponding partition will be read , Thereby reducing IO,
and dataFilters The use of is
private lazy val pushedDownFilters = {
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
// `dataFilters` should not include any metadata col filters
// because the metadata struct has been flatted in FileSourceStrategy
// and thus metadata col filters are invalid to be pushed down
dataFilters.filterNot(_.references.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
}).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
}
pushedDownFilters When specifically reading data , To filter . For different FileFormat There are different ways to deal with it
about DataSourceStrategy, It is processing and using source api Defined by Data Source, I'll put the corresponding LogicalRelation Convert to RowDataSourceScanExec:
def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
case ScanOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
...
among pruneFilterProjectRaw There are ways to :
val candidatePredicates = filterPredicates.map { _ transform {
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
}}
val (unhandledPredicates, pushedFilters, handledFilters) =
selectFilters(relation.relation, candidatePredicates)
selectFilters I'll take the corresponding Catalyst Of Filters Convert to data source Filters,
val scan = RowDataSourceScanExec(
requestedColumns,
requestedColumns.toStructType,
pushedFilters.toSet,
handledFilters,
PushedDownOperators(None, None, None, Seq.empty, Seq.empty),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
such pushedFilters Will pass in scanBuilder As scanBuilder The parameters of datasource Level filtering
- v2 Medium Datasource Push down
At present, in terms of implementation , Only support JDBC Push down of type , The implementation and V1 Is not the same ,DataSourceV2Strategy Just do the conversion of physical plan , The push down operation is optimized rule V2ScanRelationPushDown Done in :
object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper with AliasHelper {
import DataSourceV2Implicits._
def apply(plan: LogicalPlan): LogicalPlan = {
val pushdownRules = Seq[LogicalPlan => LogicalPlan] (
createScanBuilder,
pushDownSample,
pushDownFilters,
pushDownAggregates,
pushDownLimits,
pruneColumns)
pushdownRules.foldLeft(plan) { (newPlan, pushDownRule) =>
pushDownRule(newPlan)
}
}
This involves filter and aggregate Wait for the downward judgment ,
among createScanBuilder Will create a ScanBuilderHolder, stay pushDownFilters Will call pushFilters Method , To call scanBuilder Of pushPredicates Method to record the predicates that need to be pushed down , Such as JDBCScanBuilder
override def build(): Scan = {
val resolver = session.sessionState.conf.resolver
val timeZoneId = session.sessionState.conf.sessionLocalTimeZone
val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
// the `finalSchema` is either pruned in pushAggregation (if aggregates are
// pushed down), or pruned in pruneColumns (in regular column pruning). These
// two are mutual exclusive.
// For aggregate push down case, we want to pass down the quoted column lists such as
// "DEPT","NAME",MAX("SALARY"),MIN("BONUS"), instead of getting column names from
// prunedSchema and quote them (will become "MAX(SALARY)", "MIN(BONUS)" and can't
// be used in sql string.
JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), finalSchema, pushedPredicate,
pushedAggregateList, pushedGroupBys, tableSample, pushedLimit, sortOrders)
}
stay pruneColumns Method scan The construction of , In this way DataSourceV2Strategy The rules will be used when scan Build the corresponding RDD:
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, DataSourceV2ScanRelation(
_, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _)) =>
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
if (v1Relation.schema != scan.readSchema()) {
throw QueryExecutionErrors.fallbackV1RelationReportsInconsistentSchemaError(
scan.readSchema(), v1Relation.schema)
}
val rdd = v1Relation.buildScan()
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
val dsScan = RowDataSourceScanExec(
output,
output.toStructType,
Set.empty,
pushed.toSet,
pushedDownOperators,
unsafeRowRDD,
v1Relation,
tableIdentifier = None)
withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false) :: Nil
....
case class JDBCScan(
relation: JDBCRelation,
prunedSchema: StructType,
pushedPredicates: Array[Predicate],
pushedAggregateColumn: Array[String] = Array(),
groupByColumns: Option[Array[String]],
tableSample: Option[TableSampleInfo],
pushedLimit: Int,
sortOrders: Array[SortOrder]) extends V1Scan {
override def readSchema(): StructType = prunedSchema
override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = {
new BaseRelation with TableScan {
override def sqlContext: SQLContext = context
override def schema: StructType = prunedSchema
override def needConversion: Boolean = relation.needConversion
override def buildScan(): RDD[Row] = {
val columnList = if (groupByColumns.isEmpty) {
prunedSchema.map(_.name).toArray
} else {
pushedAggregateColumn
}
relation.buildScan(columnList, prunedSchema, pushedPredicates, groupByColumns, tableSample,
pushedLimit, sortOrders)
}
}.asInstanceOf[T]
}
See for JDBC The predicate of , Is to call toV1TableScan Method tableScan structure , Call later buildScan structure RDD, Finally, build RowDataSourceScanExec Physical plan , That's it V2 Datasource Push down .
v1 v2 Datasource The difference between pushing down
V1 Push down in builds the corresponding scan Generated together with the physical plan , And in the V2 Is alone in V2ScanRelationPushDown Build in rules , In the physical plan generation stage, only the generation is called RDD Methods .
边栏推荐
- Is it necessary to buy pension insurance? What are the pension products suitable for the elderly?
- By voting for the destruction of STI by Dao, seektiger is truly community driven
- Seaport 以及 ERC-4907 能否成为释放NFT流动性的新途径?| Tokenview
- 黑马程序员-软件测试-16阶段3-功能测试-175-198,URL组成介绍,请求内容以及组成说明行功能测试与数据库,url组成扩展说明,客户端与服务器请求与响应,-Fiddler按照以及功能检查确认,
- 凭借左程云(左神)的这份 “程序员代码面试指南”我入职了字节
- Array exercise 3
- 手把手实践一个DAPP,通往Web3.0之路!
- 【Kernel】驱动开发学习之字符设备
- Look back at the paper after reading the code: yolov3 reread
- OI回忆录
猜你喜欢
随机推荐
[characteristic Engineering]
OpenCV极坐标转换函数warpPolar的使用
[C classes and objects] - Methods and class and object programming in C
Jira --- workflow call external api
通过ip获取归属地
RISC-V技術雜談
Yolov5 label and establish your own data set
5.2 数据库安全
警惕!又一起网络钓鱼攻击事件:Uniswap被盗810万美元
Viewing the technology stack of distributed system from the crash report of station B
一款关于日常习惯打卡的小程序
be vigilant! Another phishing attack: uniswap stolen $8.1 million
网传USDT和USDC要做空?带你一探究竟 | Tokenview
类型详解·自定义类型·结构体初识
Talk about distributed locks
Look back at the paper after reading the code: yolov3 reread
From the casino logic, analyze the investment value of platform currency 2020-03-03
Local storage sessionstorage
Is it necessary to buy pension insurance? What are the pension products suitable for the elderly?
Shenzhen Prudential written examination record

![Conversation technology [dark horse introduction series]](/img/31/ba806652a2a836e53c7fa942770696.png)







