当前位置:网站首页>Flink入门到实战-阶段二(图解运行时架构)
Flink入门到实战-阶段二(图解运行时架构)
2022-07-17 05:24:00 【顶尖高手养成计划】
系统架构

提交作业流程
高级抽象视角

独立模式

Yarn集群
会话模式
1.先对于yarn申请一个JobManager

2.JobManager处理任务

单作业模式

数据流程图

并行度
流程图

实验
public class FlinkSoctet {
public static void main(String[] args) throws Exception {
//得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> initData = env.socketTextStream("master",9997);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = initData.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String item, Collector<String> out) throws Exception {
String[] resItem = item.split(" ");
for (String s : resItem) {
out.collect(s);
}
}
}).setParallelism(1).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String item) throws Exception {
return Tuple2.of(item, 1);
}
}).setParallelism(2);
//对于得到的元组的流数据,进行分组聚合
map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).sum(1).setParallelism(3).print();
//由于是流处理程序,所以这里要不断的执行
env.execute();
}
}得到的结果

结论
- 可以看出Flink的并行度和Spark的重分区的概念很像
算子链

实验
public class FlinkSoctet {
public static void main(String[] args) throws Exception {
//得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> initData = env.socketTextStream("master",9997);
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = initData.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String item, Collector<String> out) throws Exception {
String[] resItem = item.split(" ");
for (String s : resItem) {
out.collect(s);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String item) throws Exception {
return Tuple2.of(item, 1);
}
});
//对于得到的元组的流数据,进行分组聚合
map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).sum(1).print();
//由于是流处理程序,所以这里要不断的执行
env.execute();
}
}结果

结论
- 可以看到一对一的关系的算子他们在一个任务里面执行
- 如果是一对多,那么就是分开两个任务执行
- 这个和Spark里面发生Shuffle拆分Stage很像
执行图

任务(Tasks)和任务槽(Task Slots)
理论
任务对任务槽的共享

下面是分配的例子任务槽和并行度的关系





实验
前期准备
现在的情况是有3个task slots,如果我们设置并行度为5是什么情况

实验代码
public class FlinkSoctet {
public static void main(String[] args) throws Exception {
//得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> initData = env.socketTextStream("master",9997);
env.setParallelism(5);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = initData.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String item, Collector<String> out) throws Exception {
String[] resItem = item.split(" ");
for (String s : resItem) {
out.collect(s);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String item) throws Exception {
return Tuple2.of(item, 1);
}
});
//对于得到的元组的流数据,进行分组聚合
map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).sum(1).print();
//由于是流处理程序,所以这里要不断的执行
env.execute();
}
}结果

报错

任务调度
可以根据自己的要求,合理分配task在不同的taskslots,了解即可,因为taskslots共享就是一种对于任务执行的优化
边栏推荐
- How to record enterprise or personal domain names
- Web Security (XSS and CSRF)
- Pytorch learning diary (II)
- Product Case Interviews
- Summary of Statistics for Interview
- How to output a digital diamond with a for loop
- 基于小波域的隐马尔可夫树模型的图像去噪方法的matlab实现代码
- 网络知识-04 网络层-ICMP协议
- Redis(二) - Jedis
- Data analysis and visualization -- the shoes with the highest sales volume on jd.com
猜你喜欢

爬虫基础—WEB网页基础

Pytorch learning diary (4)

M analysis of anti-interference performance of high-speed frequency hopping communication system based on Simulink

M BTS antenna design based on MATLAB, with GUI interface

4. Installation and use of idea

CAN FD如何应用Vector诊断工具链?

Product Case Interviews

Paper reading: deep residual shrink networks for fault diagnosis

Fundamentals of crawler - basic principles of agent

Data protection / disk array raid protection IP segment 103.103.188 xxx
随机推荐
M matlab simulation of bit error rate using LDPC, turbo and convolutional channel coding and decoding in VBLAST cooperative MIMO system segment
Sword finger offer question brushing record - offer 04 Search in two-dimensional array
High concurrency day02 (concurrent package)
Steam game server configuration selection IP
ivew 穿梭框Transfer组件高亮显示操作值
M simulation of cooperative MIMO distributed space-time coding technology based on MATLAB
m3GPP-LTE通信网络中认知家庭网络Cognitive-femtocell性能matlab仿真
Review of 4705 NLP
M BTS antenna design based on MATLAB, with GUI interface
SSM整合
Summary of Statistics for Interview
Review - 5703 Statistical Inference and Modeling
nodejs
基于小波域的隐马尔可夫树模型的图像去噪方法的matlab实现代码
Network knowledge-03 data link layer PPPoE
OpenSUSE install Netease cloud music (tumblefeed) (LEAP)
Pytorch tensor
Pytorch learning diary (III)
2021-10-25 browser compatibility problems
9. Account and authority
