当前位置:网站首页>Flink introduction to actual combat - phase IV (time and window diagram)
Flink introduction to actual combat - phase IV (time and window diagram)
2022-07-19 10:01:00 【Top master cultivation plan】
Flink Temporal semantics in

The processing time (Processing Time)
The concept of processing time is very simple , It refers to the system time of the machine performing the processing operation .
Event time (Event Time)
Event time , It refers to the time when each event occurs on the corresponding device , That is, the time of data generation .

Waterline
Why is the waterline used ?
Mainly to solve , Authenticity of distributed data processing , If you use The processing time , Let's say if 8:59 The data of is not in 9 Arrive at o'clock , So at this point 8:59 There is no way to process the data , Use the water mark plus a certain l Disorder delay can be handled correctly according to the occurrence time of the event

The water level in an ordered flow ( Ideal situation )

Periodic insertion of water marks in ordered flow
Water level in disorderly flow ( Physical truth )

Disordered flow
Processing mode , If the time of the later data is smaller than the water level , Then there is no need to change the water level

But the problem is that it is handled too often , So here we use periodic processing of a batch of data , Then the maximum water level is obtained and used as the current water level to spread downward

There will be the problem of closing the window

Want to solve when the window closes , Then the delay mechanism is used , Is the water level to 2 When , At this time, we set it to 0, Namely 2 When the data arrives, wait 2 When the water level of comes over

Characteristics of water level line
Now we can know , The water mark represents the current event time clock , And it can be based on the timestamp of data
Add some delay to ensure no data loss , This is very important for the correct handling of out of order flow .
We can summarize the characteristics of the water mark :
A watermark is a marker inserted into a data stream , It can be considered as a special data
The main content of the watermark is a timestamp , Used to indicate the progress of the current event time
Watermarks are generated in code
Ordered flow
stream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>()
{
@Override
public long extractTimestamp(Event element, long recordTimestamp)
{
return element.timestamp;
}
})
);Disordered flow
stream.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 5s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// The logic of extracting time stamps
@Override
public long extractTimestamp(Event element, long
recordTimestamp) {
return element.timestamp;
}
})
).print();Transmission of water level

- Its own water level is the smallest water level in the upstream zone
- For downstream transmission , Broadcast your own water level to all water levels downstream
- If the received water level is smaller than the current water level , Then don't change your water level
window
flink The window of , According to the processing time, put it into different barrels ,spark No ability to deal with events in disorder

The classification of windows
Classified by drive type
The window itself is a way to intercept bounded data , So a very important message of the window is actually “ How to intercept

Classify according to the rules of window allocation data
Scroll the window (Tumbling Windows)
By time , Or scroll through the amount of information

The sliding window (Sliding Windows)
The following is also the amount of information by time

Session window (Session Windows)
It is divided according to the timeout of the session , The concept of calculation cannot be used here

Global window (Global Windows)
You need to customize a trigger to trigger the operation of the next window

window Api
Window splitter
Time window
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...)stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)Count window
stream.keyBy(...)
.countWindow(10)stream.keyBy(...)
.countWindow(10,3)Window function
Installation and exchange between streams

Incremental aggregate function : It is to calculate the result as soon as the data comes , When the time comes , Pass the result directly to the back
Total aggregate function : It means that the data is processed when the time comes
Incremental aggregate function reduce
pojo
public class Event {
public String id;
public String name;
public Long timeStemp;
public Event() {
}
public Event(String id, String name, Long timeStemp) {
this.id = id;
this.name = name;
this.timeStemp = timeStemp;
}
@Override
public String toString() {
return "Event{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", timeStemp=" + timeStemp +
'}';
}
}Applications
public class FlinkApp {
public static void main(String[] args) throws Exception {
// Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> initData = env.addSource(new SourceFunction<Event>() {
boolean flag = true;
String[] names = {"a", "boy", "mary"};
String[] urls = {"/baidu", "xinlang", "google"};
Random random = new Random();
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (flag) {
Thread.sleep(1000);
ctx.collect(new Event(
urls[random.nextInt(3)],
names[random.nextInt(3)],
Calendar.getInstance().getTimeInMillis()
));
}
}
@Override
public void cancel() {
flag = false;
}
});
// Set the watermark for the initial data
SingleOutputStreamOperator<Event> watermarksData = initData.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 10s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// The logic of extracting time stamps
@Override
public long extractTimestamp(Event element, long
recordTimestamp) {
return element.timeStemp;
}
})
);
// For the data map Operation is used for later aggregation
KeyedStream<Tuple2<String, Integer>, String> keyedStream = watermarksData.map(new MapFunction<Event, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Event value) throws Exception {
return Tuple2.of(value.name, 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
// Open a window for the data after grouping , The size of the sliding window is 5 second
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0,value1.f1+value2.f1);
}
}).print();
env.execute();
}
}Pre aggregate function aggregate
public class FlinkApp {
public static void main(String[] args) throws Exception {
// Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> initData = env.addSource(new SourceFunction<Event>() {
boolean flag = true;
String[] names = {"a", "boy", "mary"};
String[] urls = {"/baidu", "xinlang", "google"};
Random random = new Random();
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (flag) {
Thread.sleep(1000);
ctx.collect(new Event(
urls[random.nextInt(3)],
names[random.nextInt(3)],
Calendar.getInstance().getTimeInMillis()
));
}
}
@Override
public void cancel() {
flag = false;
}
});
// Set the watermark for the initial data
SingleOutputStreamOperator<Event> watermarksData = initData.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 10s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// The logic of extracting time stamps
@Override
public long extractTimestamp(Event element, long
recordTimestamp) {
return element.timeStemp;
}
})
);
// For the data map Operation is used for later aggregation
KeyedStream<Tuple2<String, Integer>, String> keyedStream = watermarksData.map(new MapFunction<Event, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Event value) throws Exception {
return Tuple2.of(value.name, 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
// Open a window for the data after grouping , The size of the sliding window is 10 second
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// Window specification aggregation , Here is the function of pre aggregation of data in the window
.aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<String,Integer>, Tuple2<String,Integer>>() {
// Initialize the intermediate state
@Override
public Tuple2<String, Integer> createAccumulator() {
return Tuple2.of("init",0);
}
// Every data in the window will be called once , The first data is new data , The second parameter is the accumulator
@Override
public Tuple2<String, Integer> add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
return Tuple2.of(value.f0,accumulator.f1+value.f1);
}
// Called when the window is triggered
@Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {
return accumulator;
}
// The merge function will be used in the session window , Is the merging of two accumulators , If it's not a session window, you don't need to implement
@Override
public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
return Tuple2.of(a.f0,a.f1+b.f1);
}
}).print();
env.execute();
}
}Full window functions process
After the window is triggered , Then all the data is processed together , The above pre aggregation is to process the data in the window one by one , The result is output when the window is triggered
public class FlinkApp {
public static void main(String[] args) throws Exception {
// Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> initData = env.addSource(new SourceFunction<Event>() {
boolean flag = true;
String[] names = {"a", "boy", "mary"};
String[] urls = {"/baidu", "xinlang", "google"};
Random random = new Random();
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (flag) {
Thread.sleep(1000);
ctx.collect(new Event(
urls[random.nextInt(3)],
names[random.nextInt(3)],
Calendar.getInstance().getTimeInMillis()
));
}
}
@Override
public void cancel() {
flag = false;
}
});
// Set the watermark for the initial data
SingleOutputStreamOperator<Event> watermarksData = initData.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 10s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// The logic of extracting time stamps
@Override
public long extractTimestamp(Event element, long
recordTimestamp) {
return element.timeStemp;
}
})
);
// For the data map Operation is used for later aggregation
KeyedStream<Tuple2<String, Integer>, String> keyedStream = watermarksData.map(new MapFunction<Event, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Event value) throws Exception {
return Tuple2.of(value.name, 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
// Open a window for the data after grouping , The size of the sliding window is 10 second
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// IN, OUT, KEY, W
.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
// The first parameter is key The second parameter is all the data after the window is triggered , The third parameter is the context object
// The fourth parameter is passed to the downstream collector
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
Integer sum=0;
// Accumulate the data coming from the window
for (Tuple2<String, Integer> element : elements) {
sum += element.f1;
}
// Get the start time and end time of the window
long start = context.window().getStart();
long end = context.window().getEnd();
String res="key: "+s+" start: "+start+" end: "+end+" "+"sum: "+sum;
out.collect(res);
}
}).print();
env.execute();
}
}Output result
11> key: a start: 1657974850000 end: 1657974860000 sum: 4
14> key: boy start: 1657974850000 end: 1657974860000 sum: 1
11> key: mary start: 1657974850000 end: 1657974860000 sum: 2aggregate and process Use a combination of
public class FlinkApp {
public static void main(String[] args) throws Exception {
// Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> initData = env.addSource(new SourceFunction<Event>() {
boolean flag = true;
String[] names = {"a", "boy", "mary"};
String[] urls = {"/baidu", "xinlang", "google"};
Random random = new Random();
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (flag) {
Thread.sleep(1000);
ctx.collect(new Event(
urls[random.nextInt(3)],
names[random.nextInt(3)],
Calendar.getInstance().getTimeInMillis()
));
}
}
@Override
public void cancel() {
flag = false;
}
});
// Set the watermark for the initial data
SingleOutputStreamOperator<Event> watermarksData = initData.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 10s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// The logic of extracting time stamps
@Override
public long extractTimestamp(Event element, long
recordTimestamp) {
return element.timeStemp;
}
})
);
// For the data map Operation is used for later aggregation
KeyedStream<Tuple2<String, Integer>, String> keyedStream = watermarksData.map(new MapFunction<Event, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Event value) throws Exception {
return Tuple2.of(value.name, 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
// Open a window for the data after grouping , The size of the sliding window is 10 second
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.aggregate(new AggregateFunction<Tuple2<String, Integer>, Integer, Integer>() {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(Tuple2<String, Integer> value, Integer accumulator) {
return accumulator + value.f1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
// The first parameter is the front aggregate The value from the incremental aggregation function of
// The second parameter is the returned value
// The third parameter is key
// The fourth parameter defaults to TimeWindow
}, new ProcessWindowFunction<Integer, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Integer, String, String, TimeWindow>.Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
// Because after the window is triggered, it is in aggregate Under the influence of , The past data is one, so we directly next
Integer next = elements.iterator().next();
// Get the start time and end time of the window
long start = context.window().getStart();
long end = context.window().getEnd();
String res="key: "+s+" start: "+start+" end: "+end+" "+"sum: "+next;
out.collect(res);
}
}).print();
env.execute();
}
}Processing late data ( Deepen the understanding of windows and watermarks )
Application testing
public class FlinkApp {
public static void main(String[] args) throws Exception {
// Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);
// The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite
// Because its own water level is the smallest water level in the zone , Then your own is always negative infinite
// It can't trigger the rise of the water level
env.setParallelism(1);
// The first parameter is a name , The second parameter is used to represent the event time
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
// Suppose that the parameter we input in the console is a 15s, So we're going to 15*1000 To get the millisecond time of the timestamp
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
// Set the water line
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 2s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
// Specify the event time
return element.f1;
}
})
);
// Define the identification of a side output stream
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late") {
};
SingleOutputStreamOperator<Tuple2<String, Long>> result = watermarks.keyBy(data -> data.f0)
// The size of the window is 10s, Note that this is the time of the event
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// Define window closing delay , Is to allow the maximum late data , Because the maximum delay set above is 2s, Add this 2s So that is
// The maximum late data allowed is 4 second
.allowedLateness(Time.seconds(2))
// Use the defined identity
.sideOutputLateData(outputTag)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return Tuple2.of(value1.f0, value2.f1 + value1.f1);
}
});
// .aggregate(); Next, you can define a processing function to process
result.print("result");
// Get the data of the side output stream
DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
sideOutput.print("late");
env.execute();
}
}
stay linux It uses nc
nc -lk 9999Then input
a 1
a 15
a 2The result
result> (a,1000)
late> (a,2000)Schematic diagram

summary :
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
Indicates that the waterline is delayed 2 second , That is to say, if the time at this time is 5, If there is no delay, then it is 4999, So delay 2 The second water mark is 2999 .
allowedLateness(Time.seconds(2)) After the window is closed, you can also collect 2 Second data , If 2 Seconds later, I haven't come , This data is either lost , Or it is sent to the side output stream
边栏推荐
- 6G空天地一体化网络高空平台基站下行频谱效率研究
- 金纳米粒子修饰MIL-101骨架材料(AuNPs/MIL-101)/负载COF-TpPa-1(Au NPs/COF-TpPa-1)|齐岳试剂
- rhcsa 第二天 7.15
- TP5 判断请求方式
- CLWY权限管理(二)--- 用户模块
- 负载型金属有机骨架ZIF-8/氧化石墨烯储氢材料|二氧化钛/ZIF-8复合材料|二氧化硅@ZIF8纳米材料
- [565. Array nesting]
- Use of cookies and sessions in actual projects
- Es index, type (mapping), document, IK word breaker
- 【摸鱼神器】UI库秒变低代码工具——表单篇(二)子控件
猜你喜欢

纳米银颗粒负载UiO-66|Fe3O4/Cu3(BTC)2金属有机骨架(MOF)纳米复合材料|NaGdF4:Yb,Er上转换纳米粒子@ZIF-8

Clwy authority management (II) -- user module

【C语言】浅涉选择、循环语句、函数及数组
![[C language] storage of floating-point type in memory](/img/a5/5e360fb0a1b3425dd9372cc29d0a3b.png)
[C language] storage of floating-point type in memory

mof定制产品|N-K2Ti4O9/g-C3N4/UiO-66三元复合材料|纸基Au-AgInSe2-ZIF-8纳米复合材料

氨基的金属-有机骨架材料Fe-MOF,Fe-MIL-88NH2|Zr基金属-有机骨架催化剂(Pt-UiO-66)|齐岳生物

【摸鱼神器】UI库秒变低代码工具——表单篇(二)子控件

数据包知识

工程效能CI/CD之流水线引擎的建设实践

一汽丰田亚洲狮首次产品焕新
随机推荐
Clwy permission management (III) -- user group module
标准化、归一化和正则化的关系
网络安全学习(千锋网络安全笔记)1--搭建虚拟机
rhcsa 第二天 7.15
mof定制材料|双金属CuNi-MOF纳米材料|核—壳结构[email protected]纳米复合材料|ZIF-8/聚偏氟乙烯复合纳米纤维膜PVDF
UiO-66-(COOH)2改性聚酰胺纳滤膜|ZIF-8/PVP复合纳米纤维膜|UiO-66-NH2改性聚酰胺纳滤膜
在线教育知识付费网站源码系统+直播+小程序,安装教程
[fishing artifact] UI library second low code tool - form part (II) sub control
工程效能CI/CD之流水线引擎的建设实践
how to use culasLt
Idea stuck and reported an error: UI was frozen for xxxxx MS problem solving
关于基础模块中的依赖由微服务中的子模块继承的时候依赖失效的问题
金属有机骨架材料/聚合物复合材料ZIF-8/P(TDA-co-HDA)|氧化锌[email protected](Fe)复合纳米材料
金属有机骨架/碳化氮纳米片(UiO-66/HOCN)复合材料|MIL-101负载Au-Pd合金纳米粒子|化学试剂mof定制
JS 之 操作 String 字符串
[565. Array nesting]
卫星网络中基于时变图的节能资源分配策略
Clwy permission management (I) -- project construction
金纳米粒子修饰MIL-101骨架材料(AuNPs/MIL-101)/负载COF-TpPa-1(Au NPs/COF-TpPa-1)|齐岳试剂
[C language] summary of array knowledge points