当前位置:网站首页>Flink entry to practice - stage 5 (processing function)
Flink entry to practice - stage 5 (processing function)
2022-07-19 10:02:00 【Top master cultivation plan】
brief introduction

Use
processFunction
It cannot register timers
public class FlinkApp {
public static void main(String[] args) throws Exception {
// Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);
// The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite
// Because its own water level is the smallest water level in the zone , Then your own is always negative infinite
// It can't trigger the rise of the water level
env.setParallelism(1);
// The first parameter is a name , The second parameter is used to represent the event time
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
// Suppose that the parameter we input in the console is a 15s, So we're going to 15*1000 To get the millisecond time of the timestamp
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
// Set the water line
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 2s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
// Specify the event time
return element.f1;
}
})
);
// Define the identification of a side output stream
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("outputTag") {
};
// In ordinary datastream Of api You can use it when you are unsure
SingleOutputStreamOperator<Tuple2<String, Long>> res = watermarks.process(new ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
System.out.println(" Current waterline : " + ctx.timerService().currentWatermark());
if (value.f0.equals("a")) {
// If a Then write to the side output stream
ctx.output(outputTag, value);
} else {
out.collect(value);
}
}
});
res.print("result");
// Get the data of the side output stream
DataStream<Tuple2<String, Long>> sideOutput = res.getSideOutput(outputTag);
sideOutput.print("outputTag");
env.execute();
}
}start-up linux Of nc
nc -lk 9999a 1
b 1
Console output
Current waterline : -9223372036854775808
outputTag> (a,1000)
Current waterline : -1001
result> (b,1000)KeyedProcessFunction
want keyBy You can use it later
Use of timer
public class FlinkApp {
public static void main(String[] args) throws Exception {
// Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);
// The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite
// Because its own water level is the smallest water level in the zone , Then your own is always negative infinite
// It can't trigger the rise of the water level
env.setParallelism(1);
// The first parameter is a name , The second parameter is used to represent the event time
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
// Suppose that the parameter we input in the console is a 15s, So we're going to 15*1000 To get the millisecond time of the timestamp
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
// Set the water line
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 0s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
// Specify the event time
return element.f1;
}
})
);
// In ordinary datastream Of api You can use it when you are unsure
//KeyedProcessFunction Only in keyBy Can be used
watermarks.keyBy(data->data.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>,Tuple2<String, Long>>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Long start = ctx.timestamp();
// Register a timer
ctx.timerService().registerEventTimeTimer(start+2*1000L);
System.out.println("start: "+start+" end: "+(start+2*1000L));
out.collect(value);
System.out.println(" Event time : "+start+" The water mark is :"+ctx.timerService().currentWatermark());
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>.OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
System.out.println(" The timer starts : "+timestamp+" The water mark is :"+ctx.timerService().currentWatermark());
}
}).print();
env.execute();
}
}start-up linux Of nc
nc -lk 9999Input
a 1
a 4Output
start: 1000 end: 3000
(a,1000)
Event time : 1000 The water mark is :-9223372036854775808
start: 4000 end: 6000
(a,4000)
Event time : 4000 The water mark is :999
The timer starts : 3000 The water mark is :3999Conclusion
- You can see that the starting waterline is negative infinite , The relationship between the registered timer and the time event is , The water level is -1 millisecond
- At the end of the program, the water level is infinite , Is to trigger the program that did not trigger
边栏推荐
- 硫化镉负载MIL-125(Ti)|链霉亲和素(SA)-锆基卟啉MOF复合材料([email protected])|壳核结构Story of status code
- 对文本实现分词以及绘制词云
- 数组模拟队列
- 5分钟就能轻松入门的机器学习基础知识
- R language data Table import data practice: data Table uses dcast data. The table function implements pivot table
- Add - before the command in makefile to ignore the error caused by the command and continue to execute the next command
- 第4章-一阶多智体系统一致性 -> 切换拓扑系统一致性【程序代码】
- CLWY权限管理(一)--- 项目搭建
- Use of cookies and sessions in actual projects
猜你喜欢
![[C language] string, escape character and comment](/img/c5/6cfea937ea1568cedb8fad3c61e70a.png)
[C language] string, escape character and comment

关于基础模块中的依赖由微服务中的子模块继承的时候依赖失效的问题

UiO-66-(COOH)2改性聚酰胺纳滤膜|ZIF-8/PVP复合纳米纤维膜|UiO-66-NH2改性聚酰胺纳滤膜
[email protected])|壳核结构硫化镉负载MIL-125(Ti)|链霉亲和素(SA)-锆基卟啉MOF复合材料([email protected])|壳核结构

第4章-一阶多智体系统一致性 -> 切换拓扑系统一致性

565. 数组嵌套 / 剑指 Offer II 001. 整数除法

对文本实现分词以及绘制词云
![[C language] storage of shaping data](/img/50/5454826da9a671fff0138388d23469.png)
[C language] storage of shaping data

Experiment 1: camera calibration experiment using Matlab toolbox

华为无线设备配置静态负载均衡
随机推荐
Construction practice of pipeline engine of engineering efficiency ci/cd
纳米银颗粒负载UiO-66|Fe3O4/Cu3(BTC)2金属有机骨架(MOF)纳米复合材料|NaGdF4:Yb,Er上转换纳米粒子@ZIF-8
Rhcsa day 1 7.11
华为无线设备配置动态负载均衡
Duilib implements tooltip custom mouse prompt window
卟啉包裹型金属有机框架材料[email protected]|[email protected]<
cmake -- 笔记
Have you learned the database design pattern of multi tenant SaaS?
node+express搭建服务器环境
关于基础模块中的依赖由微服务中的子模块继承的时候依赖失效的问题
第4章-一阶多智体系统一致性 -> 切换拓扑系统一致性【程序代码】
硫化铜纳米粒/ZIF-8复合材料([email protected]载体)|UiO-66/CoSO复合材料|ZIF-67纳米晶表面修饰六咪唑环三磷腈
VIM details
[C language] user defined type elementary knowledge points
C language force buckle question 25 of K a group of inverted linked list. Multi pointer traversal
Conversion between two-dimensional array and sparse array
PTA 1037 在霍格沃茨找零钱
18. Shell Scripting (1)
[fishing artifact] UI library second low code tool - form part (II) sub control
sqli-labs(less-11)