「flink」flink中的时间类型(代码片段)

ilovezihan ilovezihan     2023-04-22     205

关键词:

Flink中的时间类型和窗口是非常重要概念,是学习Flink必须要掌握的两个知识点。

Flink中的时间类型

时间类型介绍

Flink流式处理中支持不同类型的时间。分为以下几种:

  1. 处理时间
    • Flink程序执行对应操作的系统时间。所有基于时间的操作(例如:时间窗口)都将使用运行相应operator的系统时间。例如:每个小时的处理时间窗口包括在系统时间范围内所有operator接收到的记录。例如:如果应用程序在09:15开始运行,则第一个滚动时间窗口将包括:09:15 – 10:00 之间的处理事件,下一个窗口包括上午10:00 – 11:00之间的处理事件
    • 这种处理时间方式实时性是最好的,但数据未必准确
  2. 事件时间
    • 每个事件发生的时间。这个时间一般是在进入到Flink之前就包含在事件中
    • 针对Eventtime,事件被处理的时间以来与事件本身
    • Eventtime必须要指定如何生成Eventtime Watermark(水印)
    • 理想情况,不管事件何时到达或者顺序如何,事件时间处理能够得到完整一致地结果。
    • 事件处理在等待乱序事件时,会产生一些延迟。这样会对Eventtime的应用性能有一定的影响
  3. 摄入时间
    • 摄入时间是事件进入Flink的时间
    • 在source operator中,每个记录以时间戳的形式获取源的当前时间
    • 它在概念是处于事件时间和处理时间中间
    • 摄入时间不能处理乱序问题或者延迟数据,摄入时间可以由流式系统自动生成水印

Flink支持的这几种时间刚好和我们上一篇播客中的内容相对应。

https://www.cnblogs.com/ilovezihan/p/12254479.html

应用一张Flink官网的图。

技术图片

Flink代码中设置时间类型

通常,我们在Flink初始化流式运行环境时,就会设置流处理时间特性。这个设置很重要,它决定了数据流的行为方式。(例如:是否需要给事件分配时间戳),以及窗口操作应该使用什么样的时间类型。例如:KeyedStream.timeWindow(Time.seconds(30))。


我们接下来通过实现一个每5秒中进行一次单词计数的案例,来说明Flink中如何指定时间类型。

public class WordCountWindow 
    public static void main(String[] args) throws Exception 
        // 1. 初始化流式运行环境
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        // 2. 设置时间处理类型,这里设置的方式处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 3. 定义数据源,每秒发送一个hadoop单词
        DataStreamSource<String> wordDS = env.addSource(new RichSourceFunction<String>() 

            private boolean isCanaled = false;

            @Override
            public void run(SourceContext<String> ctx) throws Exception 
                while (!isCanaled) 
                    ctx.collect("hadooop");
                    Thread.sleep(1000);
                
            

            @Override
            public void cancel() 
                isCanaled = true;
            
        );

        // 4. 每5秒进行一次,分组统计
        // 4.1 转换为元组
        wordDS.map(word -> Tuple2.of(word, 1))
                // 指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 按照单词进行分组
                .keyBy(t -> t.f0)
                // 滚动窗口,3秒计算一次
                .timeWindow(Time.seconds(3))
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() 
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception 
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    
                , new RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() 
                    @Override
                    public void apply(String word, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception 

                        // 打印窗口开始、结束时间
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        System.out.println("窗口开始时间:" + sdf.format(window.getStart())
                                + " 窗口结束时间:" + sdf.format(window.getEnd())
                                + " 窗口计算时间:" + sdf.format(System.currentTimeMillis()));

                        int sum = 0;
                        Iterator<Tuple2<String, Integer>> iterator = input.iterator();
                        while(iterator.hasNext()) 
                            Integer count = iterator.next().f1;
                            sum += count;
                        
                        out.collect(Tuple2.of(word, sum));
                    
                ).print();

        env.execute("app");
    

窗口开始时间:2020-02-05 00:22:21 窗口结束时间:2020-02-05 00:22:24 窗口计算时间:2020-02-05 00:22:24
4> (hadooop,2)
窗口开始时间:2020-02-05 00:22:24 窗口结束时间:2020-02-05 00:22:27 窗口计算时间:2020-02-05 00:22:27
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:27 窗口结束时间:2020-02-05 00:22:30 窗口计算时间:2020-02-05 00:22:30
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:30 窗口结束时间:2020-02-05 00:22:33 窗口计算时间:2020-02-05 00:22:33
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:33 窗口结束时间:2020-02-05 00:22:36 窗口计算时间:2020-02-05 00:22:36
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:36 窗口结束时间:2020-02-05 00:22:39 窗口计算时间:2020-02-05 00:22:39

我们可以看到,这个滚动窗口,每3秒计算一次,是按照系统时间来计算的。

我们再把时间窗口设置为1分钟,再试试。

窗口开始时间:2020-02-05 00:27:00 窗口结束时间:2020-02-05 00:28:00 窗口计算时间:2020-02-05 00:28:00
4> (hadooop,32)

窗口开始时间:2020-02-05 00:28:00 窗口结束时间:2020-02-05 00:29:00 窗口计算时间:2020-02-05 00:29:00
4> (hadooop,60)

刚好在 00:27:00 – 00:28:00之间。


参考文件:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html

flink中的时间和窗口(代码片段)

目录6.1时间语义6.1.1Flink中的时间语义6.1.2哪种时间语义更重要6.2水位线(Watermark)6.2.1事件时间和窗口6.2.2什么是水位线6.2.3如何生成水位线6.2.4水位线的传递 6.2.5水位线的总结6.1时间语义6.1.1Flink中的时间语义Flink是一个... 查看详情

flink中的time与window(代码片段)

...间的不同概念EventTime:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳IngestionTime:是数据进入Flink的时间ProcessingTime:是... 查看详情

12-flink-1.10.1-flink中的时间语义和watermark(代码片段)

1Fink中的时间语义 EventTime:事件产生的时候的时间,比如日志访问日志产生的时间IngestionTime:数据进入flink的时间ProcessingTime:执行操作算子的本地系统时间,与机器无关1.1哪种时间语义更重要不同的时间语义有不... 查看详情

12-flink-1.10.1-flink中的时间语义和watermark(代码片段)

目录1Fink中的时间语义1.1哪种时间语义更重要2设置EventTime3水位线(Watermark)3.1水位线概念3.2watermark原理和特点4watermark的传递,引入和设置4.1 watermark的传递 4.2watermark代码中引入 4.3自定义watermark4.4 watermark的实例代... 查看详情

flink---wordcount(代码片段)

Flink处理的类型:DataStream,类似于SparkStreaming中的DStream文章目录一、导入依赖二、编写代码一、导入依赖<!--Java版--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifac 查看详情

13-flink-1.10.1-flink状态管理(代码片段)

目录1Flink中的状态2算子状态(OperatorState) 2.1算子状态类型3键控状态(keyedState) 3.1 键控状态数据结构3.2 键控状态使用3.3 实例3.3.1第一种实现方式3.3.2第二种实现方式1Flink中的状态由一... 查看详情

13-flink-1.10.1-flink状态管理(代码片段)

目录1Flink中的状态2算子状态(OperatorState) 2.1算子状态类型3键控状态(keyedState) 3.1 键控状态数据结构3.2 键控状态使用3.3 实例3.3.1第一种实现方式3.3.2第二种实现方式1Flink中的状态由一... 查看详情

flink原理学习窗口和时间(代码片段)

...生的时间,一旦确定就不会发生变化。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。ProcessingTime:即处理时间,是... 查看详情

flink原理学习窗口和时间(代码片段)

...生的时间,一旦确定就不会发生变化。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。ProcessingTime:即处理时间,是... 查看详情

「flink」事件时间与水印(代码片段)

我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系。获取窗口开始时间Flink源代码获取窗口的开始时间为以下代码:org.apache.flink.streaming.api.windowing.windows.TimeWindow/***Methodtogetthewindowstartf... 查看详情

flink的类型与序列化(代码片段)

...nk在内部以二进制的格式将数据保存,由于普通的Java对象类型和内部二进制格式不一致,需要一套相互转换机制来进行序列化和反序列化。2.DataStream类型系统2.1物理类型Flink支持的物理类型如下图所示:分为基础类型、数组类型... 查看详情

flink中的时间和窗口(代码片段)

目录6.3窗口(Window)6.3.1窗口的概念 6.3.2窗口的分类6.3.3窗口API概览6.3.4窗口分配器(WindowAssigners)6.3.5窗口函数(WindowFunctions)6.3窗口(Window)在流处理中,我们往往需要面对的是连续不断、无... 查看详情

flink流计算随笔(代码片段)

...)在流上的工作方式与批处理不同。例如,不可能计算流中的所有元素,因为流通常是无限的(×××的)。相反,流上的聚合(计数、和等)是由窗口windows限定作用域的,例如“过去5分钟的计数”或“最后100个元素的总和”。Windows可... 查看详情

利用flink消费kafka数据保证全局有序(代码片段)

...topic下又可以由多个partition构成。有时候我们在消费kafka中的数据想要保证消费kafka中的所有的分区下数据是全局有序的,这种情况下就需要将topic下的partition的数量设置为一个这样才会保证全局有序,但是这种情况消费数据并没... 查看详情

flink系列之:基于scala语言实现flink实时消费kafkatopic中的数据(代码片段)

Flink系列之:基于scala语言实现flink实时消费KafkaTopic中的数据一、引入flink相关依赖二、properties保存连接kafka的配置三、构建flink实时消费环境四、添加Kafka源和处理数据五、完整代码六、执行程序查看消费到的数据一、引入fli... 查看详情

flink学习(十四)flink窗口时间和水位线(代码片段)

...nk支持如下3种:滚动窗口,窗口数据有固定的大小,窗口中的数据不会叠加;滑动窗口,窗口数据有固定的大小,并且有生成间隔;会话窗口,窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加。 &nb... 查看详情

12-flink-1.10.1-flink中的时间语义和watermark(代码片段)

目录1Fink中的时间语义1.1哪种时间语义更重要2设置EventTime3水位线(Watermark)3.1水位线概念3.2watermark原理和特点4watermark的传递,引入和设置4.1 watermark的传递 4.2watermark代码中引入 4.3自定义watermark4.4 watermark的实例代... 查看详情

flink总结之一文彻底搞懂时间和窗口(代码片段)

...(Watermark)1.什么是水位线2.水位线分类1.有序流中的水位线1、如何使用2.乱序流中的水位线2.1乱序流中如何保证数据的准确性2.2如何使用三、窗口1.什么是窗口2.窗口分类1.按照驱动类型分1.计数窗口(CountWindow)2.... 查看详情