flink实现自定义滑动窗口(代码片段)

JasonLee-后厂村程序员 JasonLee-后厂村程序员     2022-11-29     651

关键词:

背景

一般情况下 Flink 提供的窗口可以满足我们大部分的场景,但是有时候我们需要计算一个固定时间范围内的数据,比如实时计算每天凌晨到第二天凌晨的数据,或者每天上午 7 点到第二天上午 7 点。类似于这种情况 Flink 默认提供的窗口是不支持的,因为 Flink 计算窗口的开始时间和结束时间是根据数据本身携带的时间戳然后把数据划分到不同的窗口的,所以它不是一个固定的范围。这个时候就需要我们自己实现窗口划分的逻辑。Flink 提供了 WindowAssigner 抽象类,我们只需要实现 assignWindows 方法即可。

WindowAssigner 源码

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable 
    private static final long serialVersionUID = 1L;

    /**
     * Returns a @code Collection of windows that should be assigned to the element.
     *
     * @param element The element to which windows should be assigned.
     * @param timestamp The timestamp of the element.
     * @param context The @link WindowAssignerContext in which the assigner operates.
     */
    public abstract Collection<W> assignWindows(
            T element, long timestamp, WindowAssignerContext context);

    /** Returns the default trigger associated with this @code WindowAssigner. */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

    /**
     * Returns a @link TypeSerializer for serializing windows that are assigned by this @code
     * WindowAssigner.
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

    /**
     * Returns @code true if elements are assigned to windows based on event time, @code false
     * otherwise.
     */
    public abstract boolean isEventTime();

其中 assignWindows 方法决定了一条数据应该划分到几个窗口里面,getDefaultTrigger 返回和 WindowAssigner 相关联的默认触发器,决定何时触发窗口计算,getWindowSerializer 返回窗口的序列化器,isEventTime 返回是否是 eventtime 时间语义。

自定义 MyEventTimeWindow 实现

package flink.streaming.window;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.List;

/**
 * 自定义实现 window
 */
public class MyEventTimeWindow extends WindowAssigner<Object, TimeWindow> 

    // 窗口的大小
    private final long size;
    // 多长时间滑动一次
    private final long slide;
    // 窗口偏移量
    private final long offset;

    protected MyEventTimeWindow(long size, long slide, long offset) 
        this.size = size;
        this.slide = slide;
        this.offset = offset;
    

    public static MyEventTimeWindow of(Time size, Time slide, Time offset) 
        return new MyEventTimeWindow(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
    

    public static MyEventTimeWindow of(Time size, Time slide) 
        return new MyEventTimeWindow(size.toMilliseconds(), slide.toMilliseconds(), 0L);
    

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext windowAssignerContext) 
        Calendar calendar = Calendar.getInstance();
        calendar.setTimeInMillis(timestamp);
        // 设置从每天的0点开始计算
        calendar.set(Calendar.HOUR_OF_DAY, 0);
        calendar.set(Calendar.MINUTE, 0);
        calendar.set(Calendar.SECOND, 0);
        calendar.set(Calendar.MILLISECOND, 0);
        // 获取窗口的开始时间 其实就是 0 点
        long winStart = calendar.getTimeInMillis();
        // 获取窗口的结束时间,就是在开始时间的基础上加上窗口的长度 这里是 1 天
        calendar.add(Calendar.DATE, 1);
        // 获取窗口的结束时间 其实就是第二天的 0 点
        long winEnd = calendar.getTimeInMillis() + 1;
        String format = String.format("window的开始时间:%s,window的结束时间:%s", winStart, winEnd);
        System.out.println(format);
        // 当前数据所属窗口的结束时间
        long currentWindowEnd = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide) + slide;
        System.out.println(TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide) + "====" + currentWindowEnd);
        // 一条数据属于几个窗口 因为是滑动窗口一条数据会分配到多个窗口里
        int windowCounts = (int) ((winEnd - currentWindowEnd) / slide);
        List<TimeWindow> windows = new ArrayList<>(windowCounts);
        long currentEnd = currentWindowEnd;
        if (timestamp > Long.MIN_VALUE) 
            while (currentEnd < winEnd) 
                windows.add(new TimeWindow(winStart, currentEnd));
                currentEnd += slide;
            
        
        return windows;
    

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) 
        return EventTimeTrigger.create();
    

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) 
        return new TimeWindow.Serializer();
    

    @Override
    public boolean isEventTime() 
        return true;
    

这里主要有两个问题:

1,数据被分到几个窗口?窗口的长度 / 窗口滑动的步长 = 窗口的个数。

2,窗口的开始时间和结束时间怎么计算?对应的 TimeWindow#getWindowStartWithOffset 方法。

getWindowStartWithOffset 源码

/**
 * Method to get the window start for a timestamp.
 *
 * @param timestamp epoch millisecond to get the window start.
 * @param offset The offset which window start would be shifted by.
 * @param windowSize The size of the generated windows.
 * @return window start
 */
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) 
    return timestamp - (timestamp - offset + windowSize) % windowSize;

窗口的开始时间主要就是通过上面的算法计算而来,有了窗口的开始时间,那结束时间就非常简单了,直接加上窗口的大小就好了。

验证结果

window = ds.window(MyEventTimeWindow.of(Time.days(1), Time.hours(1)))
window的开始时间:1639238400000(2021-12-12 00:00:00),window的结束时间:1639324800001(2021-12-13 00:00:00)

3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639242000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639245600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639249200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639252800000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639256400000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639260000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639263600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639267200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639270800000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639274400000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639278000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639281600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639285200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639288800000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639292400000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639296000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639299600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639303200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639306800000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639310400000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639314000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639317600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639321200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639324800000)

窗口的开始时间是 2021-12-12 00:00:00,窗口的结束时间是 2021-12-13 00:00:00。窗口的长度是 24 小时,每隔 1 小时滑动一次,所以一条数据会分配到 24 个窗口里面,所以触发了 24 个窗口计算,结果也没有问题,这样就实现了任意时间的滑动窗口。

推荐阅读

Flink 任务实时监控最佳实践

Flink on yarn 实时日志收集最佳实践

Flink 1.14.0 全新的 Kafka Connector

Flink 1.14.0 消费 kafka 数据自定义反序列化类

Flink SQL JSON Format 源码解析

如果你觉得文章对你有帮助,麻烦点一下在看吧,你的支持是我创作的最大动力.

flink自定义窗口分配器周月(代码片段)

...sionwindowsandglobalwindows.同时还可以通过扩展WindowAssigner类来实现自定义窗口assigner。所有内置的窗口分配程序(除了globalwindows)都根据时间将元素分配给窗口,时间可以是处理时间,也可以是事件时间。基于时间的窗口有一个开始时... 查看详情

flink滑动窗口使用触发器会触发多个窗口的计算(代码片段)

...这样呢?写了个小案例,来解释这种情况为了方便使用自定义的source开发数据:classStringSourceFunctionextendsSourceFunction[String]varflag=trueoverridede 查看详情

flink滚动窗口滑动窗口会话窗口全局窗口(代码片段)

...ndows)前言  根据分配数据的规则,窗口的具体实现可以分为4类:滚动窗口(TumblingWindow)、滑动窗口(SlidingWindow)、会话窗口(SessionWindow),以及全局窗口(GlobalWindow)1.滚动窗... 查看详情

11.flink四大基石window窗口的分类flink提供了很多各种场景用的windowassigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动(代码片段)

...话窗口11.Flink四大基石Checkpoint基于Chandy-Lamport算法,实现了分布式一致性快照,提供了一致性的语义。State丰富的StateAPI。ValueState,ListState,MapState,BroadcastState。Time实现了Watermark机制。乱序数据处理,迟到数据容忍。Window... 查看详情

flink滚动窗口滑动窗口详解(代码片段)

...口,而且只会属于一个窗口。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(windowsize)。比如我们可以定义一个长度为1小时的滚动时间窗口,那么每... 查看详情

flink自定义触发器定时或达到数量触发(代码片段)

...触发计算,然后清除窗口中的元素。(默认情况下,预先实现的触发器只触发而不清除窗口状态。)案例需求当窗口中的数据量达到一定数量的时候触发计算根据执行时间每隔一定 查看详情

flink基于java的wordcount,根据滑动窗口动态排序实现(代码片段)

背景刚学习Flink没多久,之前用Spark做WordCount实现排序很简单,因为Spark做的WC是简单的批处理,直接排序就完事了,但是Flink的流处理需要考虑到状态(Stateful),并且时间语义我选择的是ProcessingTimeÿ... 查看详情

flink原理与实现:sessionwindow(代码片段)

...章:Window机制中,我们介绍了窗口的概念和底层实现,以及Flink一些内建的窗口,包括滑动窗口、翻滚窗口。本文将深入讲解一种较为特殊的窗口:会话窗口(sessionwindow)。建议您在阅读完上一篇文章... 查看详情

flink原理与实现:sessionwindow(代码片段)

...章:Window机制中,我们介绍了窗口的概念和底层实现,以及Flink一些内建的窗口,包括滑动窗口、翻滚窗口。本文将深入讲解一种较为特殊的窗口:会话窗口(sessionwindow)。建议您在阅读完上一篇文章... 查看详情

flink自定义metrics监控kafka消费(代码片段)

...在网上找了很久没找到直接能用的代码。在这里把自己的实现记录一下。有部分代码引用了:Flink监控:自定义消费延迟Metrics二实现1. CustomerJsonDeserializationimportorg.apache.commons.lang3.ThreadU 查看详情

flink自定义sqlconnector(代码片段)

...不需要用户自己定义。但是在某些特殊的情况下需要手动实现针对实际场景的sqlconnector。最近在实践中遇到了两个比较极端的场景,无法通过简单的sqlconnector实现:业务1:逻辑上将数据分写入到一个flinksqltable,物理上分发到多个表。... 查看详情

flink1.7自定义source实现(代码片段)

...ource(sourceFunction)将源添加到程序中。flink附带大量预先实现好的各种读取数据源的函数,也可以通过为非并行源去实现SourceFunction接口或者为并行源实现ParallelSourceFunction接口或扩展RichParallelSourceFunction来编写满足自己业务需要... 查看详情

4flink自定义sourcesink(代码片段)

...fromElements(1,2,3)env.generateSequence(0,1000)1.2、自定义数据源1、实现SourceFunctionSourceFunction是非并行的,所以不能指定并行度,即不能用setParallelism(num)算子;SocketTextStreamFunction就是实现的SourceFunction,源码中也有详细的用例;importorg.apac... 查看详情

flink中自定义rich函数实现(代码片段)

...,我们知道map,flatMap,reduce算子都可以自定义函数实现,比如MapFunction:publicclassMyMapFunctionimplementsMapFunction<String,Integer>@OverridepublicIntegermap(Strings)throwsExceptionreturnInteger.parseInt(s);同时,Flink中还提供了对应的Rich... 查看详情

flink的窗口计算案例(代码片段)

...的分类按照时间生成Window,为TimeWindow,根据窗口实现原理可分为三类:滚动窗口(TumblingWindow):将数据依据固定的窗口长度对数据进行分片。滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动... 查看详情

flink的窗口聚合操作(timecountwindow)(代码片段)

...aggregate从整体上分为3种类型:TimeWindow、CountCindow和自定义Window,其中每一种Window从粒度上又细分为滚动窗口(tumblingwindows)、滑动窗口(slidingwindows)和Session回话窗口。 无界窗口案列说明:packageFlink_... 查看详情

flinkaggregatefunction窗口函数,自定义udaf,udf(代码片段)

AggregateFunction比ReduceFunction更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型是输入流中的元素类型,AggregateFunction有一个add方法可以将一个输入元素添加到一个累加器中。该接口还具有... 查看详情

flinkaggregatefunction窗口函数,自定义udaf,udf(代码片段)

AggregateFunction比ReduceFunction更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型是输入流中的元素类型,AggregateFunction有一个add方法可以将一个输入元素添加到一个累加器中。该接口还具有... 查看详情