当前位置:网站首页>fink. 15 complete set of dataset module operators
fink. 15 complete set of dataset module operators
2022-07-18 06:21:00 【Flying dried fish】
dataSet A complete collection of operators
Summary
This chapter mainly explains dataSet The operator of ,dataStream Operator of see
DataStream api operator
DataSet api and DataStream Both can deal with non solution and bounded flow data , Some specific transform There are some differences in methods , But the core logic is similar , It's worth saying that , In later versions DataSet api Will be abandoned . Recommended on the official website Table api Completely replace DataSet api.
1.ExecutionEnvironment.getExecutionEnvironment() : return DataSet object Corresponding DataSet api
2. StreamExecutionEnvironment.getExecutionEnvironment(): return DataStream Object correspondence DataStream api
1.map
and datastream Medium map equally , Needless to say .
2.mapPartition
and map almost , It's just map It's processing one piece of data at a time , and mapPartion There is no iterator for passing in a partition , Efficient than map To be a few taller , You can get more data each time , Instead of passing one at a time , One obvious advantage is to reduce network transmission .
3.filter
And datastream Medium filter equally , Needless to say .
4.flatmap
and datastream Medium flatmap equally , Needless to say .
5.reduce
Define the pre and post element processing logic to send out new elements , Then the new element and the next element are handled in the same logic , In turn , Until the last element is generated .
Here is the sum :
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.ReduceOperator;
public class reduceDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> text = env.fromElements(1, 2, 3, 4);
ReduceOperator<Integer> reduceer = text.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
});
reduceer.print();
}
}
The result is :5
6.group by
Group data according to a field of the specified element , Then operate on the data in the Group , So general groupby Are used in combination with other operators , Common scenarios are : Grouping sum , Group to find the maximum , Find the minimum value by grouping .
The following shows three ways of grouping sum :
public class groupbyDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Long>> dataSet = env.fromElements(
Tuple2.of("a", 1L),
Tuple2.of("b", 2L),
Tuple2.of("a", 3L),
Tuple2.of("b", 4L),
Tuple2.of("c", 5L)
);
//1.dataSet.groupBy(0).sum(1).print();// and //dataSet.groupBy(0).aggregate(Aggregations.SUM,1).print(); It's all in groups , Then sum within the Group
//2.ataSet.groupBy(0).aggregate(Aggregations.SUM,1).print();
dataSet.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return Tuple2.of(value1.f0,value1.f1+value2.f1);
}
}).print();
}
}
result :
(a,4)
(b,6)
(c,5)
7.Aggregate
Aggregation operation , This aggregation does not mean grouping , You should understand the difference between aggregation and grouping , Grouping is just a separate grouping , Aggregation refers to the logical calculation of all data . Common scenarios are : The maximum value of aggregated data , The minimum value of aggregated data , Sum of aggregated data . You see Aggregate Defines the logical behavior of the data in the Group . and groupby Just a simple grouping , But the logical behavior of the data in the group is not defined . So generally speaking groupby Often with Aggregate Use a combination of , The former defines grouping , The latter defines the logical behavior within the Group .
Look at the below java edition Aggregate Source code :
aggregate Receive two parameters :
Aggregations Aggregator
field Location of field
public AggregateOperator<T> aggregate(Aggregations agg, int field) {
return new AggregateOperator<>(this, agg, field, Utils.getCallLocationName());
}
Aggregations The inside of the aggregator has helped us achieve the maximum , minimum value , Sum up . The source code is as follows :
public enum Aggregations {
SUM(new SumAggregationFunction.SumAggregationFunctionFactory()),
MIN(new MinAggregationFunction.MinAggregationFunctionFactory()),
MAX(new MaxAggregationFunction.MaxAggregationFunctionFactory());
// --------------------------------------------------------------------------------------------
private final AggregationFunctionFactory factory;
private Aggregations(AggregationFunctionFactory factory) {
this.factory = factory;
}// Pay attention to this method , Using this method, we can define our own aggregation logic ,
// For example, after the sum, only an even number , Odd data is filtered out , But you don't usually use , If you need
// Special behavior , It can be customized
public AggregationFunctionFactory getFactory() {
return this.factory;
}
}
Here's a simple one demo:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
public class AggregateDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Long>> dataSet = env.fromElements(
Tuple2.of("a", 1L),
Tuple2.of("b", 2L),
Tuple2.of("a", 3L),
Tuple2.of("b", 4L),
Tuple2.of("c", 5L)
);
dataSet.aggregate(Aggregations.SUM,1).print();
dataSet.groupBy(0).aggregate(Aggregations.SUM,1).print();
}
}
8.district
There's nothing to say about this , Specify field de duplication .
9.join
Be similar to inner join, No, join The data will be discarded , Look below demo
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class distinct {
public static void main(String[] args) throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Long>> dataSet = env.fromElements(
Tuple2.of(" Zhang San ", 1L),
Tuple2.of(" Li Si ", 2L),
Tuple2.of(" Zhang San ", 3L),
Tuple2.of("33",4L)
);
DataSet<Tuple2<String, String>> dataSet2 = env.fromElements(
Tuple2.of(" Zhang San ", " Beijing "),
Tuple2.of(" Li Si ", " Shanghai "),
Tuple2.of(" Zhang San ", " China ")
);
dataSet.join(dataSet2).where(0).equalTo(0).print();
}
}
(( Zhang San ,1),( Zhang San , Beijing ))
(( Zhang San ,3),( Zhang San , Beijing ))
(( Zhang San ,1),( Zhang San , China ))
(( Zhang San ,3),( Zhang San , China ))
(( Li Si ,2),( Li Si , Shanghai ))
Be similar to mysql The left and right connections of are :
leftOuterJoin and rightOuterJoin
10.rightOuterJoin
rightOuterJoin and with Use a combination of , Although this usage is strange .
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class distinct {
public static void main(String[] args) throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Long>> dataSet = env.fromElements(
Tuple2.of(" Zhang San ", 1L),
Tuple2.of(" Li Si ", 2L),
Tuple2.of(" Zhang San ", 3L),
Tuple2.of("33",4L)
);
DataSet<Tuple2<String, String>> dataSet2 = env.fromElements(
Tuple2.of(" Zhang San ", " Beijing "),
Tuple2.of(" Li Si ", " Shanghai "),
Tuple2.of(" Zhang San ", " China "),
Tuple2.of("88","99")
);
dataSet.rightOuterJoin(dataSet2).where(0).equalTo(0).with(new JoinFunction<Tuple2<String, Long>, Tuple2<String, String>, String>() {
@Override
public String join(Tuple2<String, Long> first, Tuple2<String, String> second) throws Exception {
if(first!=null){// Pay attention to judgment null, Otherwise, the report will be wrong , This is very speechless , Very unfriendly , The left-right connection is not handled internally, making people speechless .
return first.toString() + second.toString();
}else {
return second.toString();
}
}
}).print();
}
}
give the result as follows :
(88,99)
( Zhang San ,1)( Zhang San , Beijing )
( Zhang San ,3)( Zhang San , Beijing )
( Zhang San ,1)( Zhang San , China )
( Zhang San ,3)( Zhang San , China )
( Li Si ,2)( Li Si , Shanghai )
11.coGroup
The following use coGroup Realization Internal connection function ,inner join
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class coGroup {
public static void main(String[] args) throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Long>> dataSet = env.fromElements(
Tuple2.of(" Zhang San ", 1L),
Tuple2.of(" abandon ", 1000L),
// Tuple2.of(" Zhang San ", 66L),
Tuple2.of(" Li Si ", 2L)
);
DataSet<Tuple2<String, Long>> dataSet2 = env.fromElements(
Tuple2.of(" Zhang San ", 8L),
Tuple2.of(" Li Si ", 9L),
Tuple2.of(" Zhang San ", 999999999L),
Tuple2.of(" abandoned ", 999999999L)
);
dataSet.coGroup(dataSet2).where(0).equalTo(0).with(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Long>> second, Collector<String> out) throws Exception {
// HashMap<String,Long> map = new HashMap<>();
for(Tuple2<String,Long> aaa:first){
for(Tuple2<String,Long> bbb:second){
out.collect(aaa.toString()+"@@@@@@@@@@@@@@@"+bbb.toString());
}
}
}
}).print();
}
}
result :
( Zhang San ,1)@@@@@@@@@@@@@@@( Zhang San ,8)
( Zhang San ,1)@@@@@@@@@@@@@@@( Zhang San ,999999999)
( Li Si ,2)@@@@@@@@@@@@@@@( Li Si ,9)
Be careful :coGroup The first data flow If there are duplicates key Will report a mistake , and join No mistake. , Pay attention to this point , I am also very speechless , Such details are hard to find .
12.cross( The cartesian product )
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
public class crossDemo {
public static void main(String[] args) throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Long>> dataSet = env.fromElements(
Tuple2.of(" Zhang San ", 1L),
Tuple2.of(" abandon ", 1000L),
Tuple2.of(" Li Si ", 2L)
);
DataSet<Tuple2<String, Long>> dataSet2 = env.fromElements(
Tuple2.of(" Zhang San ", 8L),
Tuple2.of(" Li Si ", 9L)
);
dataSet.cross(dataSet2).with(new CrossFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public String cross(Tuple2<String, Long> val1, Tuple2<String, Long> val2) throws Exception {
return val1.toString()+val2.toString();
}
}).print();
}
}
( Zhang San ,1)( Zhang San ,8)
( Zhang San ,1)( Li Si ,9)
( abandon ,1000)( Zhang San ,8)
( abandon ,1000)( Li Si ,9)
( Li Si ,2)( Zhang San ,8)
( Li Si ,2)( Li Si ,9)
13. project
Select the field that specifies the index location , Other fields are deleted :
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
public class projectDemo {
public static void main(String[] args) throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<String, Long,Long>> dataSet = env.fromElements(
Tuple3.of(" Zhang San ", 1L,9L),
Tuple3.of(" abandon ", 1000L,9L),
Tuple3.of(" Li Si ", 2L,9L)
);
dataSet.project(1,2).print();
}
}
Output :
( Zhang San ,9)
( abandon ,9)
( Li Si ,9)
边栏推荐
- Jerry opened the key pairing, and after the first pairing TWS, it is difficult to pair successfully by cross pairing [article]
- Jerry's use of one driven two burner burning notes [chapter]
- The INI file configuration should be modified during Jerry's mode [chapter]
- 10 database optimization best practices for web developers
- [2022 wechat applet demining] how to correctly obtain openid in cloud development and non cloud development environments?
- Engineering monitoring vibrating wire wireless acquisition instrument external digital sensor access logic and data transmission
- 牛啊!2小时复现顶会论文,他的秘诀是——
- 现在网上开户安全么?想知道券商选哪个比较好尼?本人小白不懂
- [computing talk club] Lecture 6 | Sanxingdui fantasy trip: an experience that only cloud computing can bring
- LeetCode 188. 买卖股票的最佳时机 IV***(double,need triple)
猜你喜欢

ACL 2022 | 基于Prompt的自动去偏:有效减轻预训练语言模型中的偏见

Database daily question --- day 23: game play analysis L

原厂直销MOS管 KNL42150 2.8A/1500V 适用光伏逆变器 可提供样品

差距大?不同学历考生考研的要求和条件

杰理之本修改解码时钟的方法【篇】

仅需三步 轻松实现远程办公

ECCV 2022 | multi domain long tail distributed learning, research on unbalanced domain generalization (open source)

Six capabilities of test and development

Iterators and generators

迪文串口屏教程(3)
随机推荐
Comment définir Notepad + + comme mode d'ouverture par défaut
Dry goods semantic web, Web3.0, Web3, metauniverse, these concepts are still confused? (top)
Neusoft Ruichi has reached a strategic cooperation with United electronics to seize a new outlet in the domestic basic software market
函数与Symbol
Using JMeter pressure test upload and download interface practice
Is it safe to open futures accounts online?
@Use of equalsandhashcode annotation
VI editor commands
Diwen serial port screen tutorial (3)
Matlab-mex
Sword finger offer punch stack queue heap
量子计算+半导体材料!Quantinuum和JSR达成合作
【golang】基于go语言的堆结构模板
2022年安全员-A证考试题目及在线模拟考试
bisect模块
杰理之入耳检测功能【篇】
【西南大学】考研初试复试资料分享
函数高级应用
MySQL CREATE TABLE statement error: 1103 incorrect table name
[system design] 4S analysis method