当前位置:网站首页>Spark3.x entry to proficiency - stage 6 (RDD advanced operator explanation & Illustration & shuffle tuning)
Spark3.x entry to proficiency - stage 6 (RDD advanced operator explanation & Illustration & shuffle tuning)
2022-07-19 07:43:00 【Top master cultivation plan】
Detailed explanation of advanced operators
mapPartitions
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val targetData: Array[Int] = Array(1, 2, 3)
val sc = new SparkContext(conf)
// Divide into 2 individual partition Handle
val initDataRDD: RDD[Int] = sc.parallelize(targetData,2)
// Its function is to use a piece of a partition map Handle ,iter Is the data of this partition
// If the amount of data is small, you can use it , If you have a lot of data , Then it's easy to OOM
// Its return value is also a Iterator
val res: RDD[Int] = initDataRDD.mapPartitions(iter => {
iter.take(1)
})
res.collect().foreach(println)
sc.stop()
}
}mapPartitionsWithIndex
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val targetData: Array[Int] = Array(1, 2, 3)
val sc = new SparkContext(conf)
val initDataRDD: RDD[Int] = sc.parallelize(targetData,2)
// This and mapPartitions There is a partition number , The first parameter is the partition number
// The second parameter is the data of each partition
// The returned value is also a Iterator
// Next, get the first value of each partition
val res: RDD[(Int, Int)] = initDataRDD.mapPartitionsWithIndex((index, iter) => {
Array((index, iter.take(1).next())).iterator
})
res.collect().foreach(item=>{
println(item)
})
sc.stop()
}
}result
(0,1)
(1,2)sample
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val targetData: Array[Int] = Array(1, 2, 3)
val sc = new SparkContext(conf)
val initDataRDD: RDD[Int] = sc.parallelize(targetData,2)
// Draw random values from random seeds
// The first parameter sets whether to put it back after extraction ,true Represents putting back the extracted set
// The second parameter is to set the extraction ratio
// The third parameter is to set random seeds , If it is not set, then it is a random number
initDataRDD.sample(false,0.5,9).collect().foreach(println)
sc.stop()
}
}
result
2
3groupByKey
The illustration

- There is shuffle In operation , There will be privacy among them ShuffledRDD, Its function is to read the upstream key, The same key Put together , And then through groupByKey One of the inside map operation , hold key The same data is put in a set
reduceByKey
The illustration

cogroup
The illustration
join
The illustration

sortByKey
The illustration

union( Combine )
principle

object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val targetData1: Array[Int] = Array(1, 2, 3,5)
val targetData2: Array[Int] = Array(1, 2, 3,8)
val sc = new SparkContext(conf)
val initDataRDD1: RDD[Int] = sc.parallelize(targetData1,2)
val initDataRDD2: RDD[Int] = sc.parallelize(targetData2,2)
initDataRDD1.union(initDataRDD2).collect().foreach(println)
sc.stop()
}
}
result
1
2
3
5
1
2
3
8The results verify that
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val targetData1: Array[Int] = Array(1, 2, 3,5)
val targetData2: Array[Int] = Array(1, 2, 3,8)
val sc = new SparkContext(conf)
val initDataRDD1: RDD[Int] = sc.parallelize(targetData1,2)
val initDataRDD2: RDD[Int] = sc.parallelize(targetData2,2)
initDataRDD1.union(initDataRDD2)
.mapPartitionsWithIndex((index,ite)=>{
println("==========")
println(" I am a partition "+index)
ite
}).foreach(println)
sc.stop()
}
}Output results ( You can see union Pull the data of the previous partition directly , It's the original 2 Zones and 2 The result of merging partitions is 4 individual )
==========
I am a partition 0
1
2
==========
I am a partition 1
3
5
==========
I am a partition 2
1
2
==========
I am a partition 3
3
8intersection( intersection )
principle
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val targetData1: Array[Int] = Array(1, 2, 3,5)
val targetData2: Array[Int] = Array(1, 2, 3,8)
val sc = new SparkContext(conf)
val initDataRDD1: RDD[Int] = sc.parallelize(targetData1,2)
val initDataRDD2: RDD[Int] = sc.parallelize(targetData2,1)
initDataRDD1.intersection(initDataRDD2)
.mapPartitionsWithIndex((index,ite)=>{
println("==========")
println(" I am a partition "+index)
ite
}).foreach(println)
sc.stop()
}
}result
==========
I am a partition 0
3
==========
I am a partition 1
1
==========
I am a partition 2
2distinct
principle

test
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val targetData1: Array[Int] = Array(1, 2, 3,5)
val targetData2: Array[Int] = Array(1, 2, 3,8)
val sc = new SparkContext(conf)
val initDataRDD1: RDD[Int] = sc.parallelize(targetData1,2)
val initDataRDD2: RDD[Int] = sc.parallelize(targetData2,2)
initDataRDD1.union(initDataRDD2).distinct()
.mapPartitionsWithIndex((index,ite)=>{
println("==========")
println(" I am a partition "+index)
ite
}).foreach(println)
sc.stop()
}
}result
==========
I am a partition 0
8
==========
I am a partition 1
1
5
==========
I am a partition 2
2
==========
I am a partition 3
3aggregate
principle

test
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val targetData1: Array[Int] = Array(1, 2, 3,5)
val sc = new SparkContext(conf)
val initDataRDD1: RDD[Int] = sc.parallelize(targetData1,2)
// The first is the initial value
// The second is the calculation in the partition
// The third is the calculation between partitions
val res: Int = initDataRDD1.aggregate(0)(_ + _, _ + _)
println(res)
sc.stop()
}
}result
11cartesian( Dekar set )
test
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val colors: Array[String] = Array(" golden ", " Yao Hei ", " Sky blue ")
val phone: Array[String] = Array(" millet ", " Huawei ")
val sc = new SparkContext(conf)
val colorRDD: RDD[String] = sc.parallelize(colors,2)
val phoneRDD: RDD[String] = sc.parallelize(phone,2)
colorRDD.cartesian(phoneRDD)
.mapPartitionsWithIndex((index,iter)=>{
println("===============")
println(" I am a "+index)
iter
}).foreach(println)
sc.stop()
}
}result
===============
I am a 0
( golden , millet )
===============
I am a 1
( golden , Huawei )
===============
I am a 2
( Yao Hei , millet )
( Sky blue , millet )
===============
I am a 3
( Yao Hei , Huawei )
( Sky blue , Huawei )The illustration
coalesce( Merge partitions )
Test before use
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val colors: Array[String] = Array(" golden ", " Yao Hei ", " Sky blue ")
val phone: Array[String] = Array(" millet ", " Huawei ")
val sc = new SparkContext(conf)
val colorRDD: RDD[String] = sc.parallelize(colors,2)
val phoneRDD: RDD[String] = sc.parallelize(phone,2)
colorRDD.cartesian(phoneRDD)
.mapPartitionsWithIndex((index,iter)=>{
println("===============")
println(" I am a "+index)
iter
}).foreach(println)
sc.stop()
}
}The result is 4 Zones
===============
I am a 0
( golden , millet )
===============
I am a 1
( golden , Huawei )
===============
I am a 2
( Yao Hei , millet )
( Sky blue , millet )
===============
I am a 3
( Yao Hei , Huawei )
( Sky blue , Huawei )After use
object ScalaBibao {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ScalaBibao")
.setMaster("local")
val colors: Array[String] = Array(" golden ", " Yao Hei ", " Sky blue ")
val phone: Array[String] = Array(" millet ", " Huawei ")
val sc = new SparkContext(conf)
val colorRDD: RDD[String] = sc.parallelize(colors,2)
val phoneRDD: RDD[String] = sc.parallelize(phone,2)
//coalesce The second parameter indicates whether to use shuffle, Use shuffle It will make the data more uniform
colorRDD.cartesian(phoneRDD).coalesce(2, false)
.mapPartitionsWithIndex((index,iter)=>{
println("===============")
println(" I am a "+index)
iter
}).foreach(println)
sc.stop()
}
}
result
===============
I am a 0
( golden , millet )
( golden , Huawei )
===============
I am a 1
( Yao Hei , millet )
( Sky blue , millet )
( Yao Hei , Huawei )
( Sky blue , Huawei )The illustration
repartition
( Namely coalesce Use shuffle)
coalesce(numPartitions, shuffle = true)The illustration
- It does shuffle The principle of is to add a prefix to the original data , To facilitate shuffle
takeSample
colorRDD.takeSample(true,1)The following numbers indicate that several data are specified to be extracted
Shuffle tuning

For the above figure, compare the tuning parameters
| spark.reducer.maxSizeInFlight | reduceTask The size of the cache for fetching data , If you have enough memory , Then turn it up appropriately Help reduce network io The number of times , Increase throughput , The default is 48M |
| spark.shuffle.compress | Open or not maptask Write out the compression of data , The default is true |
| spark.io.compression.codec | spark Compression format for , The default is lz4,spark Provides lz4, lzf, snappy, zstd |
| spark.shuffle.file.buffer | maptask The size of the write buffer , The default is 32k, Proper enlargement can reduce the disk io frequency |
| spark.shuffle.io.maxRetries | reduceTask If the pull fails, the maximum number of attempts , The default is 3 Time , Is that if maptask Frequent occurrence of gc When , It may cause the execution of the task to fail , This parameter can be adjusted properly , Proper enlargement can improve reduceTask Success rate of |
| spark.shuffle.io.retryWait | reduceTask Failed to pull data after , Retry interval , Proper enlargement can improve reduceTask Success rate of , The default is 5 second |
| spark.shuffle.io.numConnectionsPerpeer | The network connection that the cluster can reuse is 1 |
| spark.shuffle.io.preferDirectBufs | Whether to enable the off heap memory is true |
| spark.shuffle.sort.bypassMergeThreshold | The default value is 200, If in 200 within , And there is no pre aggregation function , Then you don't have to mapTask The writing stage sorts the data , Improve performance |
边栏推荐
- Network knowledge-04 network layer IPv6
- 網絡知識-03 數據鏈路層-PPPoE
- 【操作细则】如何实现TSN系统级测试?
- Sword finger offer question brushing record - offer 05 Replace spaces
- fiddler 抓包工具使用
- 网络知识-05 传输层-UDP
- Security自动登录与防CSRF攻击冲突解决办法
- How to output a digital diamond with a for loop
- 小怿和你聊聊V2X测试系列之 如何实现C-V2X HIL测试(2022版)
- Summary of Statistics for Interview
猜你喜欢

【量化笔记】波动volatility相关技术指标以其含义

4. Installation and use of idea

TypeScript(一)

Prevent blackmail attacks through data encryption schemes

pytorch随记(5)

High concurrency day01 (NiO, concurrent package)

Network knowledge-05 transport layer TCP

Connaissance du réseau - 03 couche de liaison de données - PPPoE

MySQL regular expression ^ and $usage

持续集成如何进行Jenkins管理?
随机推荐
Redis(二) - Jedis
Cracking Metric/Business Case/Product Sense Problems
9. Account and authority
Network knowledge-03 data link layer PPP
web安全(xss及csrf)
Summary of Statistics for Interview
解决Mysql (1064) 错误: 1064 - You have an error in your SQL syntax;
Gentoo installation tutorial (systemd+gnome)
Download, configuration and basic use of C language compiler
Environment variables and folder placement location
Spark3.x-实战之双流join(窗口和redis实现方式和模板代码)
網絡知識-03 數據鏈路層-PPPoE
Spark3.x入门到精通-阶段三(深度剖析spark处理数据全流程)
Network knowledge-04 network layer IPv4 protocol
Edit close automatically generate configuration file when saving
Review - 5703 Statistical Inference and Modeling
V2X测试系列之认识V2X第二阶段应用场景
Telnet installation
High concurrency day02 (concurrent package)
Fundamentals of crawlers - basic principles of multithreading and multiprocessing