flink窗口函数处理数据(watermark和sideoutput)(代码片段)

羲凡丞相 羲凡丞相     2022-12-13     623

关键词:

@羲凡——只为了更好的活着

Flink 窗口函数处理数据(Watermark和SideOutput)

统计过去5分钟内的一些数据是流处理中最常见的一种模式。这就涉及到经典的一个问题——数据延迟或乱序怎么办?

Flink,针对数据延迟或乱序有几个重要的解决思路,
1.添加水位线Watermark
2.推迟关闭窗口时间
3.超时数据的side输出

下面的例子是,统计10s内的数据,水位线位2s,窗口再延迟4s关闭,最后超时数据side输出

1.直接上代码
package flink.window;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import static org.apache.flink.streaming.api.windowing.time.Time.seconds;

public class Test 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 并行度必须是1,不然数据会进入不同的线程中
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> inputStream = env.socketTextStream("ml20.com", 8888);
        // 超时数据输出的匿名子类对象
        OutputTag outputTag = new OutputTag<Tuple3<String, Long, Long>>("side") 
        ;
        // 将输入数据中的字段作为水位线
        DataStream<Tuple3<String, Long, Long>> dataStream = inputStream.map(new MapFunction<String, Tuple3<String, Long, Long>>() 
            @Override
            public Tuple3<String, Long, Long> map(String value) throws Exception 
                String[] arr = value.split(",");
                Tuple3<String, Long, Long> tuple3 = new Tuple3<>();
                tuple3.f0 = arr[0].trim();
                tuple3.f1 = Long.valueOf(arr[1].trim());
                tuple3.f2 = Long.valueOf(arr[2].trim());
                return tuple3;
            
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(seconds(2)) 
            @Override
            public long extractTimestamp(Tuple3<String, Long, Long> element) 
                return Long.valueOf(element.f1.toString()) * 1000;
            
        );
        // 聚合数据并读取每个窗口的开始和结束时间
        SingleOutputStreamOperator sumStream = dataStream
                .keyBy(0).timeWindow(seconds(10)).allowedLateness(seconds(4)).sideOutputLateData(outputTag)
                .aggregate(new AggregateFunction<Tuple3<String, Long, Long>, Long, Long>() 
                    @Override
                    public Long createAccumulator() 
                        return 0L;
                    

                    @Override
                    public Long add(Tuple3<String, Long, Long> value, Long accumulator) 
                        return accumulator + value.f2;
                    

                    @Override
                    public Long getResult(Long accumulator) 
                        return accumulator;
                    

                    @Override
                    public Long merge(Long a, Long b) 
                        return null;
                    
                , new WindowFunction<Long, Tuple4<String, Long, Long, Long>, Tuple, TimeWindow>() 
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<Tuple4<String, Long, Long, Long>> out) throws Exception 
                        long windowStart = window.getStart();
                        long windowEnd = window.getEnd();
                        //窗口集合的结果
                        Long aLong = input.iterator().next();
                        //输出数据
                        out.collect(new Tuple4<>(tuple.getField(0), windowStart, windowEnd, aLong));
                    
                );
        // 打印
        dataStream.print("data");
        sumStream.print("sum");
        sumStream.getSideOutput(outputTag).print("sideOutput");
        env.execute("Demo227");
    

2.测试数据(数据要一个一个的输入)
user1, 1592470610,1
user1, 1592470620,2
user1, 1592470621,3
user1, 1592470622,4
user1, 1592470612,5
user1, 1592470625,6
user1, 1592470614,7
user1, 1592470626,8
user1, 1592470616,9
3.测试结果
data> (user1,1592470610,1)
data> (user1,1592470620,2)
data> (user1,1592470621,3)
data> (user1,1592470622,4)
sum> (user1,1592470610000,1592470620000,1)
data> (user1,1592470612,5)
sum> (user1,1592470610000,1592470620000,6)
data> (user1,1592470625,6)
data> (user1,1592470614,7)
sum> (user1,1592470610000,1592470620000,13)
data> (user1,1592470626,8)
data> (user1,1592470616,9)
sideOutput> (user1,1592470616,9)

解释各位同学可能的问题
问:为啥在输入(user1,1592470620,2)后,没有触发10-20区间的计算?
答:因为我们设置了水位线时间为2秒,说白了就是向后等了2s时间再计算
问:为啥10-20区间的sum值是1而不是3?
答:因为窗口的区间是左闭右开的,10-20区间就是包含10不包含20
问:为啥(user1,1592470612,5)和(user1,1592470614,7)这两个值还能累加到10-20区间呢,而(user1,1592470616,9)不行?
答:我们设置的窗口延迟关闭4秒,所以10-20区间的延迟数据在22s-26s内到达还是能够累加到原先的数据上的。26s及之后的数据就只能到side里面了

参考1
参考2
参考3

====================================================================

@羲凡——只为了更好的活着

若对博客中有任何问题,欢迎留言交流

flink中window窗口和时间以及watermark水印(代码片段)

我们都知道,Flink的核心是流式处理,但同时也支持批处理,Flink底层是一个流式引擎,在这个上面实现了流处理和批处理,而窗口则是批处理的实现。在Flink中window从大的分类上主要有三种:TimeWindow(... 查看详情

flink窗口延迟数据处理allowedlateness

1.什么是迟到数据之前介绍过,Watermark可以用来平衡结果的完整性和延迟。除非你选择一种非常保守的Watermark生成策略,等待足够长的时间确保应该到的数据已经全部到达(以高延迟为代价确保了数据的完整性),否则你的应用... 查看详情

flink窗口延迟数据处理allowedlateness

1.什么是迟到数据之前介绍过,Watermark可以用来平衡结果的完整性和延迟。除非你选择一种非常保守的Watermark生成策略,等待足够长的时间确保应该到的数据已经全部到达(以高延迟为代价确保了数据的完整性),否则你的应用... 查看详情

flink详解系列之五--水位线(watermark)

...处理时,不可能无限期的等待延迟数据到达,当到达特定watermark时,认为在watermark之前的数据已经全部达到(即使后面还有延迟的数据),可以触发窗口计算,这个机制就是Watermark(水位线),具体如下图所示。watermark本质上是一个时间... 查看详情

flink之watermarker详解

...我们缺乏一种有效的方式来判断数据完整性,因此就有了WaterMark。watermark是建立在事件时间上的一个概念,用来刻画数据流的完整性。如果按照处理时间来衡量事件,一切都是有序的,完美的,自然而然也就无需watermark了。换句... 查看详情

flink的watermark概念解释(代码片段)

...的,也有可能是延迟的。这种情况下我们就定义出了watermark得到概念。为什么需要watermark?在flink当中,当我们基于eventtime进行窗口计算时,由于数据存在乱序和延迟到来的问题,即最先进入窗口计算的数据不... 查看详情

1分钟理解flink中watermark机制(代码片段)

...之处请在评论区帮忙指出,谢谢!目录前言一、watermark是什么?二、乱序数据处理三、迟到事件四、watermark的引入前言Flink中流处理由事件产生,经过source,再到operator,中间是有一个过程和时间的,虽... 查看详情

flink窗口window(代码片段)

...,与机器相关,默认的时间属性就是ProcessingTime。3.水位Watermark  由于事件产生的时间,和到达Flink的时间并不是完全有序的,可能先发生的时间却后达到Flink。因此需要需要设定一个延迟时间t,窗口不是到达长度之后就触发计... 查看详情

flink基于eventtime和watermark处理乱序事件和晚到的数据(代码片段)

...情况,这个时候基于窗口进行计算的结果就不对了,Flink中watermark就是为了解决这个问题的,理解watermark之前,先来说一下flink中的三个与流数据相关的概念,ProcessTime、EventTime、IngestionTime,不然很难理解watermark是怎么回事.我们先来看一... 查看详情

flink基于eventtime和watermark处理乱序事件和晚到的数据(代码片段)

...情况,这个时候基于窗口进行计算的结果就不对了,Flink中watermark就是为了解决这个问题的,理解watermark之前,先来说一下flink中的三个与流数据相关的概念,ProcessTime、EventTime、IngestionTime,不然很难理解watermark是怎么回事.我们先来看一... 查看详情

flink/scala-timewindow处理迟到数据详解(代码片段)

目录一.引言二.FlinkTimeWindow丢数据示例1.代码分析2.Watermark生成逻辑3.丢失数据代码测试三.Flink处理迟到数据策略1.forBoundedOutOfOrderness2.AllowedLateness3.SideOutputLateData四.Flink处理迟到数据实战1.代码分析2.迟到数据处理实战3.日志分析五.... 查看详情

flink/scala-timewindow处理迟到数据详解(代码片段)

目录一.引言二.FlinkTimeWindow丢数据示例1.代码分析2.Watermark生成逻辑3.丢失数据代码测试三.Flink处理迟到数据策略1.forBoundedOutOfOrderness2.AllowedLateness3.SideOutputLateData四.Flink处理迟到数据实战1.代码分析2.迟到数据处理实战3.日志分析五.... 查看详情

大数据(9e)图解flink窗口(代码片段)

...基于时间的窗口1.2、基于事件个数的窗口2、时间语义3、WaterMark(水位线)3.1、WaterMark策略3.2、WaterMark与窗口4、其它1、窗口的分类将无界限的数据切分为有界限的数据https://yellow520.blog.csdn.net/article/details/1212882401.1、基于... 查看详情

大数据(9e)图解flink窗口(代码片段)

...基于时间的窗口1.2、基于事件个数的窗口2、时间语义3、WaterMark(水位线)3.1、WaterMark策略3.2、WaterMark与窗口4、其它1、窗口的分类将无界限的数据切分为有界限的数据https://yellow520.blog.csdn.net/article/details/1212882401.1、基于... 查看详情

flink窗口处理函数windowfunction

...link系统性学习笔记在之前的文章中我们已经了解了Flink的窗口机制,并介绍了其中涉及的组件:WindowAssigner、WindowFunction、Trigger、Evictor。在Flink窗口分配器WindowAssigner中我们知道可以通过不同类型的窗口分配器WindowAssigner将元素分... 查看详情

flink的处理机制以及侧输出应用

参考技术AFlinkEventTime和Watermarkhttps://www.jianshu.com/p/5e735b63fb5bFlink只要不用时间窗口函数,就是基于事件处理,对于事件驱动的任务,我们需要关心的点,尤其是存在shuffle和聚合的时候:<1>是否存在数据倾斜<2>是否会存在... 查看详情

flink中的eventtimetrigger和processingtimetrigger详解

EventTimeTriggerEventTimeTrigger的触发完全依赖watermark,换言之,如果stream中没有watermark,就不会触发EventTimeTrigger。watermark之于事件时间就是如此重要,来看一下watermark的定义先~Watermarks是某个eventtime窗口中所有数据都... 查看详情

12-flink-1.10.1-flink中的时间语义和watermark(代码片段)

...间语义1.1哪种时间语义更重要2设置EventTime3水位线(Watermark)3.1水位线概念3.2watermark原理和特点4watermark的传递,引入和设置4.1 watermark的传递 4.2watermark代码中引入 4.3自定义watermark4.4 watermark的实例代码4.5运行实例测... 查看详情