flink学习(十四)flink窗口时间和水位线(代码片段)

wyh-study wyh-study     2023-03-16     218

关键词:

Flink 框架中支持事件时间、摄入时间和处理时间三种。而当我们在流式计算环境中数据从 Source 产生,再到转换和输出,这个过程由于网络和反压的原因会导致消息乱序。因此,需要有一个机制来解决这个问题,这个特别的机制就是“水位线”。

Flink 的窗口和时间
根据窗口数据划分的不同,目前 Flink 支持如下 3 种:

滚动窗口,窗口数据有固定的大小,窗口中的数据不会叠加;

技术图片

滑动窗口,窗口数据有固定的大小,并且有生成间隔;

技术图片

会话窗口,窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加。

技术图片

 

 

 

技术图片

 

 

 


Flink 中的时间分为三种:

事件时间(Event Time),即事件实际发生的时间;
摄入时间(Ingestion Time),事件进入流处理框架的时间;
处理时间(Processing Time),事件被处理的时间。
下面的图详细说明了这三种时间的区别和联系:

技术图片

 

 

 

事件时间(Event Time

事件时间(Event Time)指的是数据产生的时间,这个时间一般由数据生产方自身携带,比如 Kafka 消息,每个生成的消息中自带一个时间戳代表每条数据的产生时间。Event Time 从消息的产生就诞生了,不会改变,也是我们使用最频繁的时间。

利用 Event Time 需要指定如何生成事件时间的“水印”,并且一般和窗口配合使用,具体会在下面的“水印”内容中详细讲解。

我们可以在代码中指定 Flink 系统使用的时间类型为 EventTime:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置时间属性为 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);

Flink 注册 EventTime 是通过 InternalTimerServiceImpl.registerEventTimeTimer 来实现的

可以看到,该方法有两个入参:namespace 和 time,其中 time 是触发定时器的时间,namespace 则被构造成为一个 TimerHeapInternalTimer 对象,然后将其放入 KeyGroupedInternalPriorityQueue 队列中。

那么 Flink 什么时候会使用这些 timer 触发计算呢?答案在这个方法里:

InternalTimeServiceImpl.advanceWatermark。
public void advanceWatermark(long time) throws Exception 
   currentWatermark = time;

   InternalTimer<K, N> timer;

   while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) 
      eventTimeTimersQueue.poll();
      keyContext.setCurrentKey(timer.getKey());
      triggerTarget.onEventTime(timer);
   

这个方法中的 while 循环部分会从 eventTimeTimersQueue 中依次取出触发时间小于参数 time 的所有定时器,调用 triggerTarget.onEventTime() 方法进行触发。

这就是 EventTime 从注册到触发的流程。

处理时间(Processing Time)
处理时间(Processing Time)指的是数据被 Flink 框架处理时机器的系统时间,Processing Time 是 Flink 的时间系统中最简单的概念,但是这个时间存在一定的不确定性,比如消息到达处理节点延迟等影响。

我们同样可以在代码中指定 Flink 系统使用的时间为 Processing Time:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

同样,也可以在源码中找到 Flink 是如何注册和使用 Processing Time 的。

 技术图片

registerProcessingTimeTimer() 方法为我们展示了如何注册一个 ProcessingTime 定时器:
每当一个新的定时器被加入到 processingTimeTimersQueue 这个优先级队列中时,如果新来的 Timer 时间戳更小,那么更小的这个 Timer 会被重新注册 ScheduledThreadPoolExecutor 定时执行器上。

Processing Time 被触发是在 InternalTimeServiceImpl 的 onProcessingTime() 方法中:

技术图片

一直循环获取时间小于入参 time 的所有定时器,并运行 triggerTarget 的 onProcessingTime() 方法。

摄入时间(Ingestion Time)
摄入时间(Ingestion Time)是事件进入 Flink 系统的时间,在 Flink 的 Source 中,每个事件会把当前时间作为时间戳,后续做窗口处理都会基于这个时间。理论上 Ingestion Time 处于 Event Time 和 Processing Time之间。

与事件时间相比,摄入时间无法处理延时和无序的情况,但是不需要明确执行如何生成 watermark。在系统内部,摄入时间采用更类似于事件时间的处理方式进行处理,但是有自动生成的时间戳和自动的 watermark。

可以防止 Flink 内部处理数据是发生乱序的情况,但无法解决数据到达 Flink 之前发生的乱序问题。如果需要处理此类问题,建议使用 EventTime。

Ingestion Time 的时间类型生成相关的代码在 AutomaticWatermarkContext 中。

 

水位线(WaterMark)
水位线(WaterMark)是 Flink 框架中最晦涩难懂的概念之一,有很大一部分原因是因为翻译的原因。

WaterMark 在正常的英文翻译中是水位,但是在 Flink 框架中,翻译为“水位线”更为合理,它在本质上是一个时间戳。

在上面的时间类型中我们知道,Flink 中的时间:
EventTime 每条数据都携带时间戳;

ProcessingTime 数据不携带任何时间戳的信息;
IngestionTime 和 EventTime 类似,不同的是 Flink 会使用系统时间作为时间戳绑定到每条数据,可以防止 Flink 内部处理数据是发生乱序的情况,但无法解决数据到达 Flink 之前发生的乱序问题。
所以,我们在处理消息乱序的情况时,会用 EventTime 和 WaterMark 进行配合使用。

首先我们要明确几个基本问题。

水印的本质是什么
水印的出现是为了解决实时计算中的数据乱序问题,它的本质是 DataStream 中一个带有时间戳的元素

如果 Flink 系统中出现了一个 WaterMark T,那么就意味着 EventTime < T 的数据都已经到达,窗口的结束时间和 T 相同的那个窗口被触发进行计算了。

也就是说:水印是 Flink 判断迟到数据的标准,同时也是窗口触发的标记。

在程序并行度大于 1 的情况下,会有多个流产生水印和窗口,这时候 Flink 会选取时间戳最小的水印。

技术图片

 

 

 

水位线是如何生成的技术图片

Flink 提供了 assignTimestampsAndWatermarks() 方法来实现水印的提取和指定,该方法接受的入参有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 两种。

整体的类图如下:

技术图片

 

 

 

 

 

 

水位线种类

技术图片
周期性水位线

我们在使用 AssignerWithPeriodicWatermarks 周期生成水印时,周期默认的时间是 200ms,这个时间的指定位置为:

@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) 
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) 
        getConfig().setAutoWatermarkInterval(0);
     else 
        getConfig().setAutoWatermarkInterval(200);
    

是否还记得上面我们在讲时间类型时会通过 env.setStreamTimeCharacteristic() 方法指定 Flink 系统的时间类型,这个 setStreamTimeCharacteristic() 方法中会做判断,如果用户传入的是 TimeCharacteristic.eventTime 类型,那么 AutoWatermarkInterval 的值则为 200ms ,如上述代码所示。当前我们也可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法来指定自动生成的时间间隔。

在上述的类图中可以看出,我们需要通过 TimestampAssigner 的 extractTimestamp() 方法来提取 EventTime。

Flink 在这里提供了 3 种提取 EventTime() 的方法,分别是:

AscendingTimestampExtractor
BoundedOutOfOrdernessTimestampExtractor
IngestionTimeExtractor
这三种方法中 BoundedOutOfOrdernessTimestampExtractor() 用的最多,需特别注意,在这个方法中的 maxOutOfOrderness 参数,该参数指的是允许数据乱序的时间范围。简单说,这种方式允许数据迟到 maxOutOfOrderness 这么长的时间。

    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) 
        if (maxOutOfOrderness.toMilliseconds() < 0) 
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    

    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() 
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) 
            lastEmittedWatermark = potentialWM;
        
        return new Watermark(lastEmittedWatermark);
    

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) 
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) 
            currentMaxTimestamp = timestamp;
        
        return timestamp;
    

PunctuatedWatermark 水位线

这种水位线的生成方式 Flink 没有提供内置实现,它适用于根据接收到的消息判断是否需要产生水位线的情况,用这种水印生成的方式并不多见。

举个简单的例子,假如我们发现接收到的数据 MyData 中以字符串 watermark 开头则产生一个水位线:

data.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() 

      @Override
      public Watermark checkAndGetNextWatermark(MyData data, long l) 
        return data.getRecord.startsWith("watermark") ? new Watermark(l) : null;
      

      @Override
      public long extractTimestamp(MyData data, long l) 
        return data.getTimestamp();
      
    );
    
class MyData
    private String record;
    private Long timestamp;
    public String getRecord() 
        return record;
    
    public void setRecord(String record) 
        this.record = record;
    
    public Timestamp getTimestamp() 
        return timestamp;
    
    public void setTimestamp(Timestamp timestamp) 
        this.timestamp = timestamp;
    

案例
我们上面讲解了 Flink 关于水位线和时间的生成,以及使用,下面举一个例子来讲解。

模拟一个实时接收 Socket 的 DataStream 程序,代码中使用 AssignerWithPeriodicWatermarks 来设置水位线,将接收到的数据进行转换,分组并且在一个10
秒,间隔是5秒的滑动窗口内获取该窗口中第二个元素最小的那条数据。

package com.wyh.windowsApi

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


object WindowTest 
  def main(args: Array[String]): Unit = 
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //周期性生成watermark 默认是200毫秒
    env.getConfig.setAutoWatermarkInterval(100L)

    /**
      * 从文件中读取数据
      *
      *
      */
    //val stream = env.readTextFile("F:flink-studywyhFlinkSDdatasensor.txt")

    val stream = env.socketTextStream("localhost", 7777)

    //Transform操作
    val dataStream: DataStream[SensorReading] = stream.map(data => 
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    )
      //===到来的数据是升序的,准时发车,用assignAscendingTimestamps
      //指定哪个字段是时间戳 需要的是毫秒 * 1000
      //      .assignAscendingTimestamps(_.timestamp * 1000)
      //===处理乱序数据
      //      .assignTimestampsAndWatermarks(new MyAssignerPeriodic())
      //==底层也是周期性生成的一个方法 处理乱序数据 延迟1秒种生成水位 同时分配水位和时间戳 括号里传的是等待延迟的时间
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) 
      override def extractTimestamp(t: SensorReading): Long = 
        t.timestamp * 1000
      
    )

    //统计10秒内的最小温度
    val minTemPerWindowStream = dataStream
      .map(data => (data.id, data.temperature))
      .keyBy(0)
      //      .timeWindow(Time.seconds(10)) //开时间窗口  滚动窗口 没有数据的窗口不会触发
      //左闭右开 包含开始 不包含结束 延迟1秒触发的那个时间的数据不包含
      //可以直接调用底层方法,第三个参数传offset代表时区
      //.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5),Time.hours(-8)))
      .timeWindow(Time.seconds(15), Time.seconds(5)) //滑动窗口,每隔5秒输出一次
      .reduce((data1, data2) => (data1._1, data1._2.min(data2._2))) //用reduce做增量聚合


    minTemPerWindowStream.print("min temp")

    dataStream.print("input data")

    env.execute("window Test")

  




//设置水位线(水印) 这里有两种方式实现
//一种是周期性生成 一种是以数据的某种特性进行生成水位线(水印)
/**
  * 周期性生成watermark 默认200毫秒
  */
class MyAssignerPeriodic() extends AssignerWithPeriodicWatermarks[SensorReading] 
  val bound: Long = 60 * 1000
  var maxTs: Long = Long.MaxValue

  override def getCurrentWatermark: Watermark = 
    //定义一个规则进行生成
    new Watermark(maxTs - bound)
  

  //用什么抽取这个时间戳
  override def extractTimestamp(t: SensorReading, l: Long): Long = 
    //保存当前最大的时间戳
    maxTs = maxTs.max(t.timestamp)
    t.timestamp * 1000
  



/**
  * 乱序生成watermark
  * 每来一条数据就生成一个watermark
  */
class MyAssignerPunctuated() extends AssignerWithPunctuatedWatermarks[SensorReading] 
  override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = 
    new Watermark(l)
  

  override def extractTimestamp(t: SensorReading, l: Long): Long = 
    t.timestamp * 1000
  

 

flink窗口和水位线(代码片段)

...h的一个转折,因为通过窗口可以实现批流一体什么是水位线个人理解:水位线是处理乱序数据或迟到数据的,如果和窗口一起使用的话,水位线还有一个功能就是可以控制窗口的关闭时间窗口分类时间窗口: 查看详情

flink总结之一文彻底搞懂时间和窗口(代码片段)

...f08;ProcessingTime)3.摄入时间(IngestionTime)二、水位线(Watermark)1.什么是水位线2.水位线分类1.有序流中的水位线1、如何使用2.乱序流中的水位线2.1乱序流中如何保证数据的准确性2.2如何使用三、窗口1.什么是窗... 查看详情

flink详解系列之五--水位线(watermark)

参考技术A在Flink中,水位线是一种衡量EventTime进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、... 查看详情

flink窗口与水位线不得不说的秘密(代码片段)

        众所周知,ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界流进行有状态计算。在我们的这个Flink框架中,自Flink1.12.0正式发布流批一体统一运行之后,我们的实时计算框架真正步入了Flink的... 查看详情

flink学习之容错机制和状态一致性

...习中,我们学习了flink中的几个重要概念:时间、水位线和状态,今天我们继续学习flink中的两个重要机制:容错机制和状态一致性保证。对往期内容感兴趣的同学可以参考👇:链接:Flink学习中之time、watermark、s... 查看详情

flink窗口函数处理数据(watermark和sideoutput)(代码片段)

...c;针对数据延迟或乱序有几个重要的解决思路,1.添加水位线Watermark2.推迟关闭窗口时间3.超时数据的side输出下面的例子是 查看详情

flink窗口window(代码片段)

一、基本概念1.窗口分类  TimeWindow:按照时间生成Window。对于TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(TumblingWindow)、滑动窗口(SlidingWindow)和会话窗口(SessionWindow)。  CountWindow:按照指定的数据条... 查看详情

学习笔记flink——flink数据流模型时间窗口和核心概念

一、Flink编程数据流模型1.1、Flink–API封装Flink提供不同级别的API封装来支持流/批处理应用程序。1.2、Flink-编程数据流Source:一个不会结束的数据记录流。Transformations:使用一个或多个数据流作为输入,生成一个或多... 查看详情

学习笔记flink——flink数据流模型时间窗口和核心概念

一、Flink编程数据流模型1.1、Flink–API封装Flink提供不同级别的API封装来支持流/批处理应用程序。1.2、Flink-编程数据流Source:一个不会结束的数据记录流。Transformations:使用一个或多个数据流作为输入,生成一个或多... 查看详情

12-flink-1.10.1-flink中的时间语义和watermark(代码片段)

...录1Fink中的时间语义1.1哪种时间语义更重要2设置EventTime3水位线(Watermark)3.1水位线概念3.2watermark原理和特点4watermark的传递,引入和设置4.1 watermark的传递 4.2watermark代码中引入 4.3自定义watermark4.4 watermark的实例代码4.... 查看详情

12-flink-1.10.1-flink中的时间语义和watermark(代码片段)

...录1Fink中的时间语义1.1哪种时间语义更重要2设置EventTime3水位线(Watermark)3.1水位线概念3.2watermark原理和特点4watermark的传递,引入和设置4.1 watermark的传递 4.2watermark代码中引入 4.3自定义watermark4.4 watermark的实例代码4.... 查看详情

flink/scala-timewindow处理迟到数据详解(代码片段)

...c;窗口中可能出现数据迟到的情况,即Event时间戳小于水位线Watermark,此时数据流未乱序流,水位线不能保证小于自己的时间戳的Event不会到来。这时会出现一种情况,水位线到达窗口触发时间触发窗口计算,... 查看详情

flink/scala-timewindow处理迟到数据详解(代码片段)

...c;窗口中可能出现数据迟到的情况,即Event时间戳小于水位线Watermark,此时数据流未乱序流,水位线不能保证小于自己的时间戳的Event不会到来。这时会出现一种情况,水位线到达窗口触发时间触发窗口计算,... 查看详情

flink原理学习窗口和时间(代码片段)

Flink原理学习之窗口和时间文章目录Flink原理学习之窗口和时间一、Flink的时间类型二、Watermark三、Flink窗口机制Java、大数据开发学习要点(持续更新中…)一、Flink的时间类型Flink的时间语义分为三种:EventTime:即... 查看详情

flink原理学习窗口和时间(代码片段)

Flink原理学习之窗口和时间文章目录Flink原理学习之窗口和时间一、Flink的时间类型二、Watermark三、Flink窗口机制Java、大数据开发学习要点(持续更新中…)一、Flink的时间类型Flink的时间语义分为三种:EventTime:即... 查看详情

flinkdatastreamapi(代码片段)

...二章Flink 环境部署第三章FlinkDataStreamAPI第四章Flink窗口和水位线第五章FlinkTableAPI&SQL第六章新闻热搜实时分析系统一、DataStreamAPI是什么?Flink中的DataStream程序是对数据流(例如过滤、更新状态、定义窗口、聚合)... 查看详情

flinkwatermaker详解

参考技术AWaterMaker水位线在很多地方都有应用,其含义也不尽相同,在Flink中,水位线是用来触发窗口计算的其本质相当于一个flink流中的一个带时间戳的Event,Flink在处理这个特殊Event的时候,会认为该Event携带的时间戳之前时间... 查看详情

flink-demo-根据事件时间触发窗口计算

...20180504-113419)使用的apiAssignerWithPunctuatedWatermarks 用于发射水位线,默认当水位线大于窗口大小时触发窗口计算FlatMapFunction 将数据进行切割,分成3元组ReduceFunction 将多个3元组进行合并,输出为一个窗口的统计3元组... 查看详情