flink窗口window(代码片段)

noyouth noyouth     2023-03-21     174

关键词:

一、基本概念

1.窗口分类

  TimeWindow:按照时间生成 Window。对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(TumblingWindow)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

  CountWindow:按照指定的数据条数生成一个 Window,与时间无关。

2.时间分类

  Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。

  Ingestion Time:是数据进入 Flink 的时间。

  Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。
3.水位Watermark
  由于事件产生的时间,和到达Flink的时间并不是完全有序的,可能先发生的时间却后达到Flink。因此需要需要设定一个延迟时间t,窗口不是到达长度之后就触发计算,而是到达长度+延迟t之后才触发计算。而水位watermark就是窗口当前数据的时间戳减去延迟时间,表示小于watermark的数据都已经到达了(不含watermark)。

 

二、案例演示

案例1:按Processing Time划分滚动时间窗口

  TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算。Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中。时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。
  在下面的代码中,因为没有指定时间类型,所以采用了默认的Processing Time,即Flink实际计算数据的时间, 通过.timeWindow(Time.seconds(10))设定窗口的大小为10秒,当一条数据进来后开始计时,10秒之后输出这个窗口中所有数据的计算结果。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowTest 

  def main(args: Array[String]): Unit = 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val socketStream = env.socketTextStream("hadoop102",7777)

    val dataStream: DataStream[SensorReading] = socketStream.map(d => 
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    )

    //统计10秒内的最小温度
    val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(10)) //10秒滚动窗口,不指定时间特性,默认为ProcessingTime
      .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2)))

    //打印原始的dataStream
    dataStream.print("data stream")

    //打印窗口数据流
    minTemperatureStream.print("min temperature")

    env.execute("window test")

  


  测试:  

  连续输入两条数据

[atguigu@hadoop102 ~]$ nc -lk 7777
sensor_1, 1547718200, 30.8
sensor_1, 1547718201, 40.8

  在一个10秒的滚动窗口内,窗口流minTemperatureStream 只输出了一条数据。此时触发TimeWindow去计算的时机就是第一条数据来的10秒过后。

data stream> SensorReading(sensor_1,1547718200,30.8)
data stream> SensorReading(sensor_1,1547718201,40.8)
min temperature> (sensor_1,30.8)

 

案例2:带水位的滚动时间窗口

  代码分析:

  ①通过env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)设定窗口的时间特性为事件时间。

  ②在assignTimestampsAndWatermarks()方法中,传递一个BoundedOutOfOrdernessTimestampExtractor类实现对象,构造器参数就是容忍的延迟时间,实现方法,指明时间戳用哪个字段。

object WindowTest 

  def main(args: Array[String]): Unit = 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val socketStream = env.socketTextStream("hadoop102",7777)

    val dataStream: DataStream[SensorReading] = socketStream
      .map(d => 
        val arr = d.split(",")
        SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
      )
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) 
        override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
      )

      //.assignAscendingTimestamps(_.timestamp) //升序数据添指定时间戳

    //统计5秒内的最小温度
    val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5)) //5秒滚动窗口
      .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2)))

    //打印原始的dataStream
    dataStream.print("data stream")

    //打印窗口数据流
    minTemperatureStream.print("min temperature")

    env.execute("window test")

  


  测试:

  当输入第一条数据时,时间戳是1547718200(单位秒),因为窗口的长度为5,所以理论上当时间戳为1547718205的数据来后,窗口会打印输出,但是由于设定了延迟2秒,所以此时水位才到1547718203,所以只有当时间戳为1547718207或之后的数据到来,水位线涨到大于等于1547718205时,窗口才会触发计算并关闭。

  sockt输入数据如下

[atguigu@hadoop102 ~]$ nc -lk 7777
sensor_1, 1547718200, 30.8
sensor_1, 1547718201, 31         
sensor_1, 1547718202, 32  
sensor_1, 1547718203, 33  
sensor_1, 1547718204, 34  
sensor_1, 1547718205, 35  
sensor_1, 1547718206, 36  
sensor_1, 1547718207, 37  
sensor_1, 1547718208,38

  控制台打印如下:

data stream> SensorReading(sensor_1,1547718200,30.8)
data stream> SensorReading(sensor_1,1547718201,31.0)
data stream> SensorReading(sensor_1,1547718202,32.0)
data stream> SensorReading(sensor_1,1547718203,33.0)
data stream> SensorReading(sensor_1,1547718204,34.0)
data stream> SensorReading(sensor_1,1547718205,35.0)
data stream> SensorReading(sensor_1,1547718206,36.0)
data stream> SensorReading(sensor_1,1547718207,37.0)
min temperature> (sensor_1,30.8)
data stream> SensorReading(sensor_1,1547718208,38.0)

 

案例3:滑动时间窗口

  滑动窗口和滚动窗口特性类似,滚动窗口可以看作一种特殊的滑动窗口,其窗口长度与滑动长度一样。在.timeWindow(Time.seconds(10),Time.seconds(5)) 方法中,设定了窗口的长度为10,滑动长度为5。窗口长度决定了窗口计算的数据的范围有多大,而滑动长度决定了窗口计算并关闭的时机。

//统计10秒内的最小温度,5秒输出一次
val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature))
  .keyBy(_._1)
  .timeWindow(Time.seconds(10),Time.seconds(5)) //滑动窗口
  .reduce((data1, data2)=>(data1._1,data1._2.min(data2._2)))

  

 

 

 

flink源码解读系列datastream窗口window实现(代码片段)

...户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。窗口本质上是将无界数据流进行有界数据处理的过程,一般 查看详情

flink源码解读系列datastream窗口window实现(代码片段)

...户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。窗口本质上是将无界数据流进行有界数据处理的过程,一般 查看详情

大数据开发-flink-窗口全解析(代码片段)

Flink窗口背景Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。而Window就是从Streaming到Batch的桥梁。通俗讲,Window是用来对一个无限的流设置一个有限的集合,从... 查看详情

flink原理与实现:window机制(代码片段)

...是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从Streaming到Batch的一个桥梁。Flink提供了非常完善的窗口机制,这是我认为的Flink最大的亮点之一(其他的亮点包括消息乱序处理,和che 查看详情

flink中的时间和窗口(代码片段)

目录6.3窗口(Window)6.3.1窗口的概念 6.3.2窗口的分类6.3.3窗口API概览6.3.4窗口分配器(WindowAssigners)6.3.5窗口函数(WindowFunctions)6.3窗口(Window)在流处理中,我们往往需要面对的是连续不断、无... 查看详情

flink的窗口聚合操作(timecountwindow)(代码片段)

窗口基本概念:Flink中的窗口是左闭右开的窗口Flink认为批处理是流处理的一个特例,而窗口window就是从流处理到批处理的一个桥梁,通常来讲窗口就是用来将无线数据流转换为优先数据集,从而在优先数据集上进... 查看详情

大数据开发-flink-窗口全解析(代码片段)

Flink窗口背景Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。而Window就是从Streaming到Batch的桥梁。通俗讲,Window是用来对一个无限的流设置一个有限的集合,从... 查看详情

大数据(9e)图解flink窗口(代码片段)

文章目录1、窗口的分类1.1、基于时间的窗口1.2、基于事件个数的窗口2、时间语义3、WaterMark(水位线)3.1、WaterMark策略3.2、WaterMark与窗口4、其它1、窗口的分类将无界限的数据切分为有界限的数据https://yellow520.blog.csdn.net/a... 查看详情

大数据(9e)图解flink窗口(代码片段)

文章目录1、窗口的分类1.1、基于时间的窗口1.2、基于事件个数的窗口2、时间语义3、WaterMark(水位线)3.1、WaterMark策略3.2、WaterMark与窗口4、其它1、窗口的分类将无界限的数据切分为有界限的数据https://yellow520.blog.csdn.net/a... 查看详情

flink源码解读系列datastream窗口分配器winowassigner(代码片段)

在前一篇文章Flink源码解读系列DataStream窗口Window实现中,我们了解到Flink窗口Window有两种具体实现,一个是TimeWindow,一个是GlobalWindow。有了窗口之后,我们如何将元素分配给窗口呢?在这篇文章中我们重点了解一下窗口分配器Win... 查看详情

flink源码解读系列datastream窗口分配器winowassigner(代码片段)

在前一篇文章Flink源码解读系列DataStream窗口Window实现中,我们了解到Flink窗口Window有两种具体实现,一个是TimeWindow,一个是GlobalWindow。有了窗口之后,我们如何将元素分配给窗口呢?在这篇文章中我们重点了解一下窗口分配器Win... 查看详情

11.flink四大基石window窗口的分类flink提供了很多各种场景用的windowassigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动(代码片段)

...马程序员贺岁视频的学习笔记11.Flink四大基石12.Window12.1.窗口的分类12.2.API12.2.1.window和windowAll12.2.2.API调用示例12.2.3.Flink提供了很多各种场景用的WindowAssigner12.3.代码演示–基于时间的滚动和滑动–掌握12.3.1.本地ubuntu下安装nc12.3.2.... 查看详情

flink中window窗口和时间以及watermark水印(代码片段)

...式引擎,在这个上面实现了流处理和批处理,而窗口则是批处理的实现。在Flink中window从大的分类上主要有三种:TimeWindow(根据时间)、CountWindow(根据数据量)、SessionWindow(会话窗口)窗口类型有... 查看详情

flink窗口介绍及应用(代码片段)

Windows是Flink流计算的核心,本文将概括的介绍几种窗口的概念,重点只放在窗口的应用上。本实验的数据采用自拟电影评分数据(userId,movieId,rating,timestamp),userId和movieId范围分别为1-100和1-200的随机数,rating范围为[0:0.5:5.0]一共1... 查看详情

flink系列窗口随笔(代码片段)

文章目录窗口概念WindowAPI窗口函数Aggregate示例其他API说明总结窗口概念一句基于Flink流处理引擎的块处理的最佳方案。窗口类型:时间窗口,计数窗口基本流程:1.先对数据分组2.先开窗3.窗口函数,对窗口实现怎... 查看详情

flink的窗口计算案例(代码片段)

窗口的分类按照时间生成Window,为TimeWindow,根据窗口实现原理可分为三类:滚动窗口(TumblingWindow):将数据依据固定的窗口长度对数据进行分片。滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,... 查看详情

flinkwindow窗口算子(代码片段)

【README】本文记录了窗口算子操作;本文使用的flink为1.14.4版本;本文部分内容总结自flink官方文档:窗口|ApacheFlink窗口#窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中࿰... 查看详情

flink窗口和水位线(代码片段)

什么是窗口我认为的窗口是无限处理流的一个核心,因为stream是流数据,而batch是批数据,window也是stream通往batch的一个转折,因为通过窗口可以实现批流一体什么是水位线个人理解:水位线是处理乱序数据或迟到数据的... 查看详情