当前位置:网站首页>Flink entry to practice - stage 5 (processing function)

Flink entry to practice - stage 5 (processing function)

2022-07-19 10:02:00 Top master cultivation plan

brief introduction

      The previously introduced stream processing API, Whether it's a basic transformation 、 polymerization , Or more complex window operations , Actually, it's all
Is based on DataStream To transform ; So it can be collectively referred to as DataStream API, This is also Flink The core of programming .
And we know that , In order to make the code more expressive and easy to use ,Flink Itself provides multiple layers API,DataStream
API Just a link in the middle

Use

processFunction

It cannot register timers

public class FlinkApp {
    public static void main(String[] args) throws Exception {
        // Get the execution environment 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);

        // The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite 
        // Because its own water level is the smallest water level in the zone , Then your own is always negative infinite 
        // It can't trigger the rise of the water level 
        env.setParallelism(1);

        // The first parameter is a name , The second parameter is used to represent the event time 
        SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] s = value.split(" ");
                // Suppose that the parameter we input in the console is a 15s, So we're going to 15*1000 To get the millisecond time of the timestamp 
                return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
            }
        });

        // Set the water line 
        SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
                //  Insert water mark for disordered flow , The delay time is set to  2s
                WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                // Specify the event time 
                                return element.f1;
                            }
                        })
        );

        // Define the identification of a side output stream 
        OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("outputTag") {
        };


        // In ordinary datastream Of api You can use it when you are unsure 
        SingleOutputStreamOperator<Tuple2<String, Long>> res = watermarks.process(new ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
            @Override
            public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                System.out.println(" Current waterline : " + ctx.timerService().currentWatermark());
                if (value.f0.equals("a")) {
                // If a Then write to the side output stream 
                    ctx.output(outputTag, value);
                } else {
                    out.collect(value);
                }
            }
        });
        res.print("result");

        // Get the data of the side output stream 
        DataStream<Tuple2<String, Long>> sideOutput = res.getSideOutput(outputTag);
        sideOutput.print("outputTag");


        env.execute();
    }
}

start-up linux Of nc

nc -lk 9999
a 1
b 1

 

Console output

 Current waterline : -9223372036854775808
outputTag> (a,1000)
 Current waterline : -1001
result> (b,1000)

KeyedProcessFunction

want keyBy You can use it later

Use of timer

public class FlinkApp {
    public static void main(String[] args) throws Exception {
        // Get the execution environment 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);

        // The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite 
        // Because its own water level is the smallest water level in the zone , Then your own is always negative infinite 
        // It can't trigger the rise of the water level 
        env.setParallelism(1);

        // The first parameter is a name , The second parameter is used to represent the event time 
        SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] s = value.split(" ");
                // Suppose that the parameter we input in the console is a 15s, So we're going to 15*1000 To get the millisecond time of the timestamp 
                return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
            }
        });

        // Set the water line 
        SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
                //  Insert water mark for disordered flow , The delay time is set to  0s
                WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                // Specify the event time 
                                return element.f1;
                            }
                        })
        );


        // In ordinary datastream Of api You can use it when you are unsure 
        //KeyedProcessFunction Only in keyBy Can be used 
        watermarks.keyBy(data->data.f0)
                        .process(new KeyedProcessFunction<String, Tuple2<String, Long>,Tuple2<String, Long>>() {
                            @Override
                            public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                                Long start = ctx.timestamp();
                                // Register a timer 
                                ctx.timerService().registerEventTimeTimer(start+2*1000L);
                                System.out.println("start: "+start+" end: "+(start+2*1000L));
                                out.collect(value);

                                System.out.println(" Event time : "+start+"  The water mark is :"+ctx.timerService().currentWatermark());
                            }

                            @Override
                            public void onTimer(long timestamp, KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>.OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                                System.out.println(" The timer starts : "+timestamp+"  The water mark is :"+ctx.timerService().currentWatermark());
                            }
                        }).print();

        env.execute();
    }
}

start-up linux Of nc

nc -lk 9999

Input

a 1
a 4

Output

start: 1000 end: 3000
(a,1000)
 Event time : 1000  The water mark is :-9223372036854775808
start: 4000 end: 6000
(a,4000)
 Event time : 4000  The water mark is :999
 The timer starts : 3000  The water mark is :3999

Conclusion

  • You can see that the starting waterline is negative infinite , The relationship between the registered timer and the time event is , The water level is -1 millisecond
  • At the end of the program, the water level is infinite , Is to trigger the program that did not trigger

原网站

版权声明
本文为[Top master cultivation plan]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/200/202207171117438257.html