关键词:
@羲凡——只为了更好的活着
Flink 窗口函数处理数据(Watermark和SideOutput)
统计过去5分钟内的一些数据是流处理中最常见的一种模式。这就涉及到经典的一个问题——数据延迟或乱序怎么办?
Flink,针对数据延迟或乱序有几个重要的解决思路,
1.添加水位线Watermark
2.推迟关闭窗口时间
3.超时数据的side输出
下面的例子是,统计10s内的数据,水位线位2s,窗口再延迟4s关闭,最后超时数据side输出
1.直接上代码
package flink.window;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import static org.apache.flink.streaming.api.windowing.time.Time.seconds;
public class Test
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 并行度必须是1,不然数据会进入不同的线程中
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> inputStream = env.socketTextStream("ml20.com", 8888);
// 超时数据输出的匿名子类对象
OutputTag outputTag = new OutputTag<Tuple3<String, Long, Long>>("side")
;
// 将输入数据中的字段作为水位线
DataStream<Tuple3<String, Long, Long>> dataStream = inputStream.map(new MapFunction<String, Tuple3<String, Long, Long>>()
@Override
public Tuple3<String, Long, Long> map(String value) throws Exception
String[] arr = value.split(",");
Tuple3<String, Long, Long> tuple3 = new Tuple3<>();
tuple3.f0 = arr[0].trim();
tuple3.f1 = Long.valueOf(arr[1].trim());
tuple3.f2 = Long.valueOf(arr[2].trim());
return tuple3;
).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(seconds(2))
@Override
public long extractTimestamp(Tuple3<String, Long, Long> element)
return Long.valueOf(element.f1.toString()) * 1000;
);
// 聚合数据并读取每个窗口的开始和结束时间
SingleOutputStreamOperator sumStream = dataStream
.keyBy(0).timeWindow(seconds(10)).allowedLateness(seconds(4)).sideOutputLateData(outputTag)
.aggregate(new AggregateFunction<Tuple3<String, Long, Long>, Long, Long>()
@Override
public Long createAccumulator()
return 0L;
@Override
public Long add(Tuple3<String, Long, Long> value, Long accumulator)
return accumulator + value.f2;
@Override
public Long getResult(Long accumulator)
return accumulator;
@Override
public Long merge(Long a, Long b)
return null;
, new WindowFunction<Long, Tuple4<String, Long, Long, Long>, Tuple, TimeWindow>()
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<Tuple4<String, Long, Long, Long>> out) throws Exception
long windowStart = window.getStart();
long windowEnd = window.getEnd();
//窗口集合的结果
Long aLong = input.iterator().next();
//输出数据
out.collect(new Tuple4<>(tuple.getField(0), windowStart, windowEnd, aLong));
);
// 打印
dataStream.print("data");
sumStream.print("sum");
sumStream.getSideOutput(outputTag).print("sideOutput");
env.execute("Demo227");
2.测试数据(数据要一个一个的输入)
user1, 1592470610,1
user1, 1592470620,2
user1, 1592470621,3
user1, 1592470622,4
user1, 1592470612,5
user1, 1592470625,6
user1, 1592470614,7
user1, 1592470626,8
user1, 1592470616,9
3.测试结果
data> (user1,1592470610,1)
data> (user1,1592470620,2)
data> (user1,1592470621,3)
data> (user1,1592470622,4)
sum> (user1,1592470610000,1592470620000,1)
data> (user1,1592470612,5)
sum> (user1,1592470610000,1592470620000,6)
data> (user1,1592470625,6)
data> (user1,1592470614,7)
sum> (user1,1592470610000,1592470620000,13)
data> (user1,1592470626,8)
data> (user1,1592470616,9)
sideOutput> (user1,1592470616,9)
解释各位同学可能的问题
问:为啥在输入(user1,1592470620,2)后,没有触发10-20区间的计算?
答:因为我们设置了水位线时间为2秒,说白了就是向后等了2s时间再计算
问:为啥10-20区间的sum值是1而不是3?
答:因为窗口的区间是左闭右开的,10-20区间就是包含10不包含20
问:为啥(user1,1592470612,5)和(user1,1592470614,7)这两个值还能累加到10-20区间呢,而(user1,1592470616,9)不行?
答:我们设置的窗口延迟关闭4秒,所以10-20区间的延迟数据在22s-26s内到达还是能够累加到原先的数据上的。26s及之后的数据就只能到side里面了
====================================================================
@羲凡——只为了更好的活着
若对博客中有任何问题,欢迎留言交流
flink中window窗口和时间以及watermark水印(代码片段)
我们都知道,Flink的核心是流式处理,但同时也支持批处理,Flink底层是一个流式引擎,在这个上面实现了流处理和批处理,而窗口则是批处理的实现。在Flink中window从大的分类上主要有三种:TimeWindow(... 查看详情
flink窗口延迟数据处理allowedlateness
1.什么是迟到数据之前介绍过,Watermark可以用来平衡结果的完整性和延迟。除非你选择一种非常保守的Watermark生成策略,等待足够长的时间确保应该到的数据已经全部到达(以高延迟为代价确保了数据的完整性),否则你的应用... 查看详情
flink窗口延迟数据处理allowedlateness
1.什么是迟到数据之前介绍过,Watermark可以用来平衡结果的完整性和延迟。除非你选择一种非常保守的Watermark生成策略,等待足够长的时间确保应该到的数据已经全部到达(以高延迟为代价确保了数据的完整性),否则你的应用... 查看详情
flink详解系列之五--水位线(watermark)
...处理时,不可能无限期的等待延迟数据到达,当到达特定watermark时,认为在watermark之前的数据已经全部达到(即使后面还有延迟的数据),可以触发窗口计算,这个机制就是Watermark(水位线),具体如下图所示。watermark本质上是一个时间... 查看详情
flink之watermarker详解
...我们缺乏一种有效的方式来判断数据完整性,因此就有了WaterMark。watermark是建立在事件时间上的一个概念,用来刻画数据流的完整性。如果按照处理时间来衡量事件,一切都是有序的,完美的,自然而然也就无需watermark了。换句... 查看详情
flink的watermark概念解释(代码片段)
...的,也有可能是延迟的。这种情况下我们就定义出了watermark得到概念。为什么需要watermark?在flink当中,当我们基于eventtime进行窗口计算时,由于数据存在乱序和延迟到来的问题,即最先进入窗口计算的数据不... 查看详情
1分钟理解flink中watermark机制(代码片段)
...之处请在评论区帮忙指出,谢谢!目录前言一、watermark是什么?二、乱序数据处理三、迟到事件四、watermark的引入前言Flink中流处理由事件产生,经过source,再到operator,中间是有一个过程和时间的,虽... 查看详情
flink窗口window(代码片段)
...,与机器相关,默认的时间属性就是ProcessingTime。3.水位Watermark 由于事件产生的时间,和到达Flink的时间并不是完全有序的,可能先发生的时间却后达到Flink。因此需要需要设定一个延迟时间t,窗口不是到达长度之后就触发计... 查看详情
flink基于eventtime和watermark处理乱序事件和晚到的数据(代码片段)
...情况,这个时候基于窗口进行计算的结果就不对了,Flink中watermark就是为了解决这个问题的,理解watermark之前,先来说一下flink中的三个与流数据相关的概念,ProcessTime、EventTime、IngestionTime,不然很难理解watermark是怎么回事.我们先来看一... 查看详情
flink基于eventtime和watermark处理乱序事件和晚到的数据(代码片段)
...情况,这个时候基于窗口进行计算的结果就不对了,Flink中watermark就是为了解决这个问题的,理解watermark之前,先来说一下flink中的三个与流数据相关的概念,ProcessTime、EventTime、IngestionTime,不然很难理解watermark是怎么回事.我们先来看一... 查看详情
flink/scala-timewindow处理迟到数据详解(代码片段)
目录一.引言二.FlinkTimeWindow丢数据示例1.代码分析2.Watermark生成逻辑3.丢失数据代码测试三.Flink处理迟到数据策略1.forBoundedOutOfOrderness2.AllowedLateness3.SideOutputLateData四.Flink处理迟到数据实战1.代码分析2.迟到数据处理实战3.日志分析五.... 查看详情
flink/scala-timewindow处理迟到数据详解(代码片段)
目录一.引言二.FlinkTimeWindow丢数据示例1.代码分析2.Watermark生成逻辑3.丢失数据代码测试三.Flink处理迟到数据策略1.forBoundedOutOfOrderness2.AllowedLateness3.SideOutputLateData四.Flink处理迟到数据实战1.代码分析2.迟到数据处理实战3.日志分析五.... 查看详情
大数据(9e)图解flink窗口(代码片段)
...基于时间的窗口1.2、基于事件个数的窗口2、时间语义3、WaterMark(水位线)3.1、WaterMark策略3.2、WaterMark与窗口4、其它1、窗口的分类将无界限的数据切分为有界限的数据https://yellow520.blog.csdn.net/article/details/1212882401.1、基于... 查看详情
大数据(9e)图解flink窗口(代码片段)
...基于时间的窗口1.2、基于事件个数的窗口2、时间语义3、WaterMark(水位线)3.1、WaterMark策略3.2、WaterMark与窗口4、其它1、窗口的分类将无界限的数据切分为有界限的数据https://yellow520.blog.csdn.net/article/details/1212882401.1、基于... 查看详情
flink窗口处理函数windowfunction
...link系统性学习笔记在之前的文章中我们已经了解了Flink的窗口机制,并介绍了其中涉及的组件:WindowAssigner、WindowFunction、Trigger、Evictor。在Flink窗口分配器WindowAssigner中我们知道可以通过不同类型的窗口分配器WindowAssigner将元素分... 查看详情
flink的处理机制以及侧输出应用
参考技术AFlinkEventTime和Watermarkhttps://www.jianshu.com/p/5e735b63fb5bFlink只要不用时间窗口函数,就是基于事件处理,对于事件驱动的任务,我们需要关心的点,尤其是存在shuffle和聚合的时候:<1>是否存在数据倾斜<2>是否会存在... 查看详情
flink中的eventtimetrigger和processingtimetrigger详解
EventTimeTriggerEventTimeTrigger的触发完全依赖watermark,换言之,如果stream中没有watermark,就不会触发EventTimeTrigger。watermark之于事件时间就是如此重要,来看一下watermark的定义先~Watermarks是某个eventtime窗口中所有数据都... 查看详情
12-flink-1.10.1-flink中的时间语义和watermark(代码片段)
...间语义1.1哪种时间语义更重要2设置EventTime3水位线(Watermark)3.1水位线概念3.2watermark原理和特点4watermark的传递,引入和设置4.1 watermark的传递 4.2watermark代码中引入 4.3自定义watermark4.4 watermark的实例代码4.5运行实例测... 查看详情