当前位置:网站首页>聊一聊Spark实现TopN的几种方式
聊一聊Spark实现TopN的几种方式
2022-07-15 14:59:00 【笑看风云路】
大家好,我是风云,欢迎大家关注我的个人原创公众号【笑看风云路】获取更多大数据技术干货,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!
扫码进,更快捷:
前言
在实际开发过程中,我们会经常碰到求TopN这样常见的需求,那在Spark中,是如何实现求TopN呢?带着这个问题,就来看一下TopN的实现方式都有哪些!
方式1:采用groupByKey
思路:
按照key对数据进行聚合(groupByKey)
对同组的key的所有value先转换为List,然后进行排序,最后取TopN
代码实现:
// 构造上下文
val conf = new SparkConf().setMaster("local").setAppName("topn")
val sc = SparkContext.getOrCreate(conf)
// 创建rdd
val path = "datas/groupsort.txt"
val rdd = sc.textFile(path)
// rdd操作得出结果
val rdd2 = rdd.map(_.split(" "))
val result = rdd2.map(arr => (arr(0).trim, arr(1).trim.toInt))
.groupByKey()
.map {
case (key, values) => {
// 对values中的数据进行排序,然后获取最大的前三个数据
val sortedValues = values.toList.sorted
val top3Values = sortedValues.takeRight(3).reverse
(key, top3Values)
}
}
// 打印输出
result.collect().foreach(println)
sc.stop()
方式2:采用两阶段聚合优化
思路:
- 第一阶段给每个key加上一个随机值前缀,然后进行局部的聚合操作
- 第二阶段去除每个key的前缀,然后进行全局的聚合操作
代码实现:
// 构造上下文
val conf = new SparkConf().setMaster("local").setAppName("topn")
val sc = SparkContext.getOrCreate(conf)
// 创建rdd
val path = "datas/groupsort.txt"
val rdd = sc.textFile(path)
// rdd操作得出结果
val rdd2 = rdd.map(_.split(" "))
val result = rdd2.mapPartitions(iter => {
val random = Random
iter.map(arr => {
val key = arr(0).trim
val value = arr(1).trim.toInt
((random.nextInt(5),key),value)
})
}).groupByKey()
.flatMap{
case ((_,key),values) => {
val sortedValues = values.toList.sorted
val top3Values = sortedValues.takeRight(3).reverse
top3Values.map(count => (key,count))
}
}
.groupByKey()
.flatMap{
case (key, values) => {
val sortedValues = values.toList.sorted
val top3Values = sortedValues.takeRight(3).reverse
top3Values.map((key,_))
}
}
// 打印输出
result.collect().foreach(println)
sc.stop()
方式3:先获取每个分区的TopN,后获取全局TopN
思路:
- 对于每一个key获取每个分区中的TopN
- 做全局的数据聚合操作,获取TopN
代码实现:
// 构造上下文
val conf = new SparkConf().setMaster("local").setAppName("topn")
val sc = SparkContext.getOrCreate(conf)
// 创建rdd
val path = "datas/groupsort.txt"
val rdd = sc.textFile(path)
// rdd操作得出结果
val rdd2 = rdd.map(_.split(" "))
val result3 = rdd2.map(arr => {
val key = arr(0).trim
val count = arr(1).trim.toInt
(key, count)
})
.mapPartitions(iter => {
import scala.collection.mutable
val temp = iter.foldLeft(mutable.Map[String, ArrayBuffer[Int]]())((a, b) => {
val key = b._1
val count = b._2
val buf = a.getOrElseUpdate(key, new mutable.ArrayBuffer[Int]())
buf += count
if (buf.size > 3) {
val max3Vals = buf.sorted.takeRight(3)
a(key) = max3Vals
}
a
})
val top3IterPrePartition = temp.toList.flatMap {
case (key, countIters) => countIters.map(count => (key, count))
}
top3IterPrePartition.toIterator
})
.groupByKey()
.flatMap {
case (key, values) => {
val sorted = values.toList.sorted
val top3 = sorted.takeRight(3).reverse
top3.map((key, _))
}
}
result3.foreachPartition(iter => iter.foreach(println))
sc.stop()
方式4:采用aggregateByKey
思路:
- 初始值为mutable.ArrayBufferInt
- 对每组key中的每个value和之前的聚合值进行聚合操作,就是在分区中,来一个value和上次取出的TopN进行一次排序,取出新的TopN
- 对每个分区操作后的局部聚合结果进行合并聚合操作,就是在分区间,来一个分区和上次取出的TopN进行一次合并排序,取出新的TopN
代码实现:
// 构造上下文
val conf = new SparkConf().setMaster("local").setAppName("topn")
val sc = SparkContext.getOrCreate(conf)
// 创建rdd
val path = "datas/groupsort.txt"
val rdd = sc.textFile(path)
// rdd操作得出结果
val rdd2 = rdd.map(_.split(" "))
import scala.collection.mutable
val result4 = rdd2.map(arr => {
val key = arr(0).trim
val count = arr(1).trim.toInt
(key, count)
}).aggregateByKey(mutable.ArrayBuffer[Int]())(
(u, v) => {
u += v
u.sorted.takeRight(3).reverse
},
(u1, u2) => {
u1 ++= u2
u1.sorted.takeRight(3).reverse
}
).flatMap {
case (key, values) => {
values.toList.map((key, _))
}
}
result4.foreachPartition(iter => iter.foreach(println))
sc.stop()
优缺点
方式1的缺点:
- groupByKey会将相同key的所有value全部加载到内存进行处理,当value特别多的时候可能出现OOM异常
- groupByKey会将所有的value数据均发送给下一个RDD,性能比较低,因为在实际聚合操作中只需要部分数据
方式2的优缺点:
- 对于聚合类Shuffle操作(groupByKey,reduceByKey等)产生的问题能够很好的解决
- 对于非聚合类(join等)产生的问题很难使用该方法解决
方式3、方式4:
- 解决了方式1实现方式的两个缺点
- 都采用了先分区内预聚合,然后进行全局聚合的思想
结语
好了,今天就为大家分享到这里了。咱们下期见!
如果本文对你有帮助的话,欢迎点赞&收藏&分享,这对我继续分享&创作优质文章非常重要。感谢
边栏推荐
- 1035. Disjoint lines
- X-Shell远程连接虚拟机
- Redis集群长时间连接不上问题Sending CLUSTER MEET messages to join the cluster Waiting for the cluster to join
- 【SQL注入】堆叠注入
- Shell编程之matrix---装逼又炫酷
- 下采样 - 信号相位和混叠
- Implementing DDD based on ABP -- aggregation and aggregation root practice
- Start of u-boot S analysis (II)
- Koin简单使用
- Browser compatibility testing system, method and process
猜你喜欢

The global cloud market is growing rapidly, and data security has entered a strong regulatory era of rule of law

LeNet

机器学习练习 5 - 偏差和方差

【SQL注入】堆叠注入
![[SQL injection] Stack Injection](/img/aa/6ad848479d492170fdd5d7613cbe64.png)
[SQL injection] Stack Injection

一种新的UI测试方法:视觉感知测试
![[detailed tutorial] a thorough article on mongodb aggregation query](/img/31/e0b7267edd4026ddb73773c9e1bbd0.png)
[detailed tutorial] a thorough article on mongodb aggregation query

Cookie与Session

Trees and binary trees

(手工)【sqli-labs38、39】堆叠注入、报错回显、字符/数字型
随机推荐
Create a list, add the strings "a", "B", "C", "d" and "d" in turn, and print the contents of the set, then remove all the strings "d" in the list, and print its contents again
OneNote代码高亮
(手工)【sqli-labs38、39】堆叠注入、报错回显、字符/数字型
Pat grade a a1079 total sales of supply chain
元宇宙带火的VR市场,字节也才摸到一点边
LeNet
How to disable SSLv3 in Apache
winform去掉右上角关闭按钮
Machine learning exercise 5 - bias and variance
C语言预处理指令
log4j.properties 日志详解
【详细教程】一文参透MongoDB聚合查询
下采样 - 信号相位和混叠
Audio and video SDP add bit rate
机器学习引言
【SpaceNet】SN6:Multi-Sensor All-Weather Mapping
codeblocks下载安装教程(完整详细)
梦想CMS 前台SQL注入
收银系统的硬件有哪些?
40+倍提升,详解 JuiceFS 元数据备份恢复性能优化之路