当前位置:网站首页>Flink入门到实战-阶段五(处理函数)
Flink入门到实战-阶段五(处理函数)
2022-07-17 11:18:00 【顶尖高手养成计划】
简介

使用
processFunction
它不能注册定时器
public class FlinkApp {
public static void main(String[] args) throws Exception {
//得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);
//并行度设置为1才能看到效果,因为如果不为1,那么有些分区的水位线就是负无穷
//由于自己的水位线是分区里面最小的水位线,那么自己的一直都是负无穷
//就触发不了水位线的上升
env.setParallelism(1);
//第一个参数就一个名字,第二个参数用来表示事件时间
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
//假设我们在控制台输入的参数是a 15s,那么我们要15*1000才能得到时间戳的毫秒时间
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
//设置水位线
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 2s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
//指定事件时间
return element.f1;
}
})
);
//定义一个侧输出流的标识
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("outputTag") {
};
//在普通的datastream的api搞不定的时候就可以使用它了
SingleOutputStreamOperator<Tuple2<String, Long>> res = watermarks.process(new ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
System.out.println("当前的水位线: " + ctx.timerService().currentWatermark());
if (value.f0.equals("a")) {
//如果为a那么就写入到侧输出流
ctx.output(outputTag, value);
} else {
out.collect(value);
}
}
});
res.print("result");
//得到侧输出流的数据
DataStream<Tuple2<String, Long>> sideOutput = res.getSideOutput(outputTag);
sideOutput.print("outputTag");
env.execute();
}
}启动linux的nc
nc -lk 9999a 1
b 1
控制台输出
当前的水位线: -9223372036854775808
outputTag> (a,1000)
当前的水位线: -1001
result> (b,1000)KeyedProcessFunction
要keyBy以后才能使用
定时器的使用
public class FlinkApp {
public static void main(String[] args) throws Exception {
//得到执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);
//并行度设置为1才能看到效果,因为如果不为1,那么有些分区的水位线就是负无穷
//由于自己的水位线是分区里面最小的水位线,那么自己的一直都是负无穷
//就触发不了水位线的上升
env.setParallelism(1);
//第一个参数就一个名字,第二个参数用来表示事件时间
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
//假设我们在控制台输入的参数是a 15s,那么我们要15*1000才能得到时间戳的毫秒时间
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
//设置水位线
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 0s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
//指定事件时间
return element.f1;
}
})
);
//在普通的datastream的api搞不定的时候就可以使用它了
//KeyedProcessFunction只有在keyBy才能使用
watermarks.keyBy(data->data.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>,Tuple2<String, Long>>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Long start = ctx.timestamp();
//注册一个定时器
ctx.timerService().registerEventTimeTimer(start+2*1000L);
System.out.println("start: "+start+" end: "+(start+2*1000L));
out.collect(value);
System.out.println("事件时间: "+start+" 水位线为:"+ctx.timerService().currentWatermark());
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>.OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
System.out.println("定时器启动了: "+timestamp+" 水位线为:"+ctx.timerService().currentWatermark());
}
}).print();
env.execute();
}
}启动linux的nc
nc -lk 9999输入
a 1
a 4输出
start: 1000 end: 3000
(a,1000)
事件时间: 1000 水位线为:-9223372036854775808
start: 4000 end: 6000
(a,4000)
事件时间: 4000 水位线为:999
定时器启动了: 3000 水位线为:3999结论
- 可以看到开始水位线为负无穷,注册的定时器和时间事件的关系是,水位线为本次事件时间的-1毫秒
- 程序结束的时候水位线为无穷大值,就是为了触发没有触发的程序
边栏推荐
- Could NOT find CUDA (missing: CUDA_INCLUDE_DIRS) (found suitable exact version “11.4“)
- 第十一章 STL 之 queue
- Relationship between standardization, normalization and regularization
- CLWY权限管理(一)--- 项目搭建
- What is the product power of lantu dreamer?
- Chapter 4 - consistency of first-order multi-agent systems - > consistency of continuous time systems with time delays
- Traffic ranking 100W website
- Idea stuck and reported an error: UI was frozen for xxxxx MS problem solving
- 实验1:使用Matlab工具箱进行相机标定实验
- 数据包知识
猜你喜欢

Part I - Fundamentals of C language_ 1. Overview of C language

第九章 STL 之 deque

rhcsa 第二天 7.15

Talking about the informatization planning of industrial enterprises

node+express搭建服务器环境

Rhcsa jour 2 7,15

Part I - Fundamentals of C language_ 2. Data type

UiO-66-(COOH)2改性聚酰胺纳滤膜|ZIF-8/PVP复合纳米纤维膜|UiO-66-NH2改性聚酰胺纳滤膜

壳聚糖包裹PCN224纳米粒子|金属-有机骨架Fe-MIL-88NH2|镍基MOF材料(Ni-MOF/NF)

Week 1: introduction to deep learning and foundation of pytorch
随机推荐
Interview questions - design test cases for:: memcpy function
【C语言】浅涉第一个C语言程序及数据类型
程序员成长第二十一篇:做任务分配时,要考虑员工的成长。
Why is SaaS so important for enterprise digital transformation?
NPM usage
标准化、归一化和正则化的关系
FAW Toyota Asia lion's first product refresh
Chapter 13 set/ multiset of STL
Idea stuck and reported an error: UI was frozen for xxxxx MS problem solving
5分钟就能轻松入门的机器学习基础知识
Chapter 4 - first order multi-agent system consistency - > switching topology system consistency
Chapter IX deque of STL
[C language] involves constants and variables
金属有机骨架材料/聚合物复合材料ZIF-8/P(TDA-co-HDA)|氧化锌[email protected](Fe)复合纳米材料
CLWY权限管理(一)--- 项目搭建
Develop the first Flink app
Programmer growth Article 21: when assigning tasks, we should consider the growth of employees.
状态码的故事
一汽丰田亚洲狮首次产品焕新
第4章-一阶多智体系统一致性 -> 切换拓扑系统一致性