flink中的时间和窗口(代码片段)

JiaXingNashishua JiaXingNashishua     2022-10-23     152

关键词:

目录

6.1 时间语义

6.1.1 Flink 中的时间语义

6.1.2 哪种时间语义更重要

6.2 水位线(Watermark)

6.2.1 事件时间和窗口

6.2.2 什么是水位线

6.2.3 如何生成水位线

6.2.4 水位线的传递

 6.2.5 水位线的总结


6.1 时间语义

6.1.1 Flink 中的时间语义

Flink 是一个分布式处理 系统。分布式架构最大的特点,就是节点彼此独立、互不影响,这带来了更高的吞吐量和容错 性;但有利必有弊,最大的问题也来源于此。

在分布式系统中,节点“各自为政”,是没有统一时钟的,数据和控制信息都通过网络进行传输。比如现在有一个任务是窗口聚合,我们希望将每个小时的数据收集起来进行统计处理。 而对于并行的窗口子任务,它们所在节点不同,系统时间也会有差异;当我们希望统计 8 点~9 点的数据时,对并行任务来说其实并不是“同时”的,收集到的数据也会有误差。

那既然一个集群中有 JobManager 作为管理者,是不是让它统一向所有 TaskManager 发送 同步时钟信号就行了呢?这也是不行的。因为网络传输会有延迟,而且这延迟是不确定的,所 以 JobManager 发出的同步信号无法同时到达所有节点;想要拥有一个全局统一的时钟,在分布式系统里是做不到的。

很明显,这里有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。 我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后

 1. 处理时间(Processing Time)

处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。

如果我们以它作为衡量标准,那么数据属于哪个窗口就很明显了:只看窗口任务处理这条 数据时,当前的系统时间。比如之前举的例子,数据 8 点 59 分 59 秒产生,而窗口计算时的时 间是 9 点零 1 秒,那么这条数据就属于 9 点—10 点的窗口;如果数据传输非常快,9 点之前就 到了窗口任务,那么它就属于 8 点—9 点的窗口了。每个并行的窗口子任务,就只按照自己的 系统时钟划分窗口。假如我们在早上 8 点 10 分启动运行程序,那么接下来一直到 9 点以前处 理的所有数据,都属于第一个窗口;9 点之后、10 点之前的所有数据就将属于第二个窗口。

这种方法非常简单粗暴,不需要各个节点之间进行协调同步,也不需要考虑数据在流中的 位置,简单来说就是“我的地盘听我的”。所以处理时间是最简单的时间语义。

2. 事件时间(Event Time)

事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实 就是这条数据记录的“时间戳”(Timestamp)。

在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数据本身。打个比方,这相当于任务处理的时候自己本身是没有时钟的,所以只好来一个数据就 问一下“现在几点了”;而数据本身也没有表,只有一个自带的“出厂时间”,于是任务就基于这 个时间来确定自己的时钟。由于流处理中数据是源源不断产生的,一般来说,先产生的数据也 会先被处理,所以当任务不停地接到数据时,它们的时间戳也基本上是不断增长的,就可以代 表时间的推进。

当然我们会发现,这里有个前提,就是“先产生的数据先被处理”,这要求我们可以保证 数据到达的顺序。但是由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的 数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要 用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。 关于水位线的概念和用法,我们会稍后介绍。

6.1.2 哪种时间语义更重要

1. 数据处理系统中的时间语义

在计算机系统中,考虑数据处理的“时代变化”是没什么意义的,我们更关心的,显然是 数据本身产生的时间。

比如我们计算网站的 PV、UV 等指标,要统计每天的访问量。如果某个用户在 23 点 59分 59 秒有一次访问,但我们的任务处理这条数据的时间已经是第二天 0 点 0 分 01 秒了;那么 这条数据,是应该算作当天的访问,还是第二天的访问呢?很明显,统计用户行为,需要考虑 行为本身发生的时间,所以我们应该把这条数据统计入当天的访问量。这时我们用到的窗口, 就是以事件时间作为划分标准的,跟处理时间无关。

所以在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。

2. 两种时间语义的对比

实际应用中,数据产生的时间和处理的时间可能是完全不同的。很长时间收集起来的数据, 处理或许只要一瞬间;也有可能数据量过大、处理能力不足,短时间堆了大量数据处理不完, 产生“背压”(back pressure)。

通常来说,处理时间是我们计算效率的衡量标准,而事件时间会更符合我们的业务计算逻 辑。所以更多时候我们使用事件时间;不过处理时间也不是一无是处。对于处理时间而言,由 于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让我们的流处理延迟降到最低, 效率达到最高。

但是我们前面提到过,在分布式环境中,处理时间其实是不确定的,各个并行任务时钟不 统一;而且由于网络延迟,导致数据到达各个算子任务的时间有快有慢,对于窗口操作就可能 收集不到正确的数据了,数据处理的顺序也会被打乱。这就会影响到计算结果的正确性。所以 处理时间语义,一般用在对实时性要求极高、而对计算准确性要求不太高的场景。

而在事件时间语义下,水位线成为了时钟,可以统一控制时间的进度。这就保证了我们总可以将数据划分到正确的窗口中,比如 8 点 59 分 59 秒产生的数据,无论网络传输的延迟是多 少,它永远属于 8 点~9 点的窗口,不会错分。但我们知道数据还可能是乱序的,要想让窗口 正确地收集到所有数据,就必须等这些错乱的数据都到齐,这就需要一定的等待时间。所以整 体上看,事件时间语义是以一定延迟为代价,换来了处理结果的正确性。由于网络延迟一般只 有毫秒级,所以即使是事件时间语义,同样可以完成低延迟实时流处理的任务。

另外,除了事件时间和处理时间,Flink 还有一个“摄入时间”(Ingestion Time)的概念, 它是指数据进入 Flink 数据流的时间,也就是 Source 算子读入数据的时间。摄入时间相当于是 事件时间和处理时间的一个中和,它是把 Source 任务的处理时间,当作了数据的产生时间添 加到数据里。这样一来,水位线(watermark)也就基于这个时间直接生成,不需要单独指定 了。这种时间语义可以保证比较好的正确性,同时又不会引入太大的延迟。它的具体行为跟事 件时间非常像,可以当作特殊的事件时间来处理。

在 Flink 中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从 1.12 版本开始,Flink 已经将事件时间作为了默认的时间语 义。

6.2 水位线(Watermark)

6.2.1 事件时间和窗口

在实际应用中,一般会采用事件时间语义。而水位线,就是基于事件时间提出的概念。所 以在介绍水位线之前,我们首先来梳理一下事件时间和窗口的关系。

一个数据产生的时刻,就是流处理中事件触发的时间点,这就是“事件时间”,一般都会以时间戳的形式作为一个字段记录在数据里。这个时间就像商品的“生产日期”一样,一旦产 生就是固定的,印在包装袋上,不会因为运输辗转而变化。如果我们想要统计一段时间内的数 据,需要划分时间窗口,这时只要判断一下时间戳就可以知道数据属于哪个窗口了。

明确了一个数据的所属窗口,还不能直接进行计算。因为窗口处理的是有界数据,我们需要等窗口的数据都到齐了,才能计算出最终的统计结果。那什么时候数据就都到齐了呢?对于 时间窗口来说这很明显:到了窗口的结束时间,自然就应该收集到了所有数据,就可以触发计 算输出结果了。比如我们想统计 8 点~9 点的用户点击量,那就是从 8 点开始收集数据,到 9点截止,将收集的数据做处理计算。这有点类似于班车,如图 6-3 所示,每小时发一班,那么8 点之后来的人都会上同一班车,到 9 点钟准时发车;9 点之后来的人,就只好等下一班 10点发的车了。

在处理时间语义下,都是以当前任务所在节点的系统时间为准的。这就相当于每辆车里都 挂了一个钟,司机看到到了 9 点就直接发车。这种方式简单粗暴容易实现,但因为车上的钟是 独立运行的,以它为标准就不能准确地判断商品的生产时间。在分布式环境下,这样会因为网 络传输延迟的不确定而导致误差。比如有些商品在 8 点 59 分 59 秒生产出来,可是从下生产线 到运至车上又要花费几秒,那就赶不上 9 点钟这班车了。而且现在分布式系统中有很多辆 9点发的班车,所以同时生产出的一批商品,需要平均分配到不同班车上,可这些班车距离有近 有远、上面挂的钟有快有慢,这就可能导致有些商品上车了、有些却被漏掉;先后生产出的商 品,到达车上的顺序也可能乱掉:统计结果的正确性受到了影响。

所以在实际中我们往往需要以事件时间为准。如果考虑事件时间,情况就复杂起来了。现 在不能直接用每辆车上挂的钟(系统时间),又没有统一的时钟,那该怎么确定发车时间呢?

现在能利用的,就只有商品的生产时间(数据的时间戳)了。我们可以这样思考:一般情 况下,商品生产出来之后,就会立即传送到车上;所以商品到达车上的时间(系统时间)应该稍稍滞后于商品的生产时间(数据时间戳)。如果不考虑传输过程的一点点延迟,我们就可以 直接用商品生产时间来表示当前车上的时间了。如图 6-4 所示,到达车上的商品,生产时间是8 点 05 分,那么当前车上的时间就是 8 点 05 分;又来了一个 8 点 10 分生产的商品,现在车 上的时间就是 8 点 10 分。我们直接用数据的时间戳来指示当前的时间进展,窗口的关闭自然 也是以数据的时间戳等于窗口结束时间为准,这就相当于可以不受网络传输延迟的影响了。像 之前所说 8 点 59 分 59 秒生产出来的商品,到车上的时候不管实际时间(系统时间)是几点, 我们就认为当前是 8 点 59 分 59 秒,所以它总是能赶上车的;而 9 点这班车,要等到 9 点整生 产的商品到来,才认为时间到了 9 点,这时才正式发车。这样就可以得到正确的统计结果了。

在这个处理过程中,我们其实是基于数据的时间戳,自定义了一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。比如双十一的时候系统处理压力大,我们可能会把大量数据缓存在 Kafka中;过了高峰时段之后再读取出来,在几秒之内就可以处理完几个小时甚至几天的数据,而且依然可以按照数据产生的时间段进行统计,所有窗口都能收集到正确的数据。而一般实时流处 理的场景中,事件时间可以基本与处理时间保持同步,只是略微有一点延迟,同时保证了窗口 计算的正确性。

6.2.2 什么是水位线

在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟, 用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数 据的时间戳来驱动的。

在分布式系统中,这种驱动方式又会有一些问题。因为数据本身在处理转换的过程中会 变化,如果遇到窗口聚合这样的操作,其实是要攒一批数据才会输出一个结果,那么下游的数据就会变少,时间进度的控制就不够精细了。另外,数据向下游任务传递时,一般只能传输给一个子任务(除广播外),这样其他的并行子任务的时钟就无法推进了。例如一个时间戳为 9点整的数据到来,当前任务的时钟就已经是 9 点了;处理完当前数据要发送到下游,如果下游任务是一个窗口计算,并行度为 3,那么接收到这个数据的子任务,时钟也会进展到 9 点,9点结束的窗口就可以关闭进行计算了;而另外两个并行子任务则时间没有变化,不能进行窗口计算。

所以我们应该把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个 时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间这个标记可以直接广播到下游,当下游任务收到这个标记,就 可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事 件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点, 主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

如图 6-5 所示,每个事件产生的数据,都包含了一个时间戳,我们直接用一个整数表示。 这里没有指定单位,可以理解为秒或者毫秒(方便起见,下面讲述统一认为是秒)。当产生于2 秒的数据到来之后,当前的事件时间就是 2 秒;在后面插入一个时间戳也为 2 秒的水位线, 随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间 戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们 只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。

水位线就像它的名字所表达的,是数据流中的一部分,随着数据一起流动,在不同任务之间传输。这看起来非常简单;接下来我们就进一步探讨一些复杂的状况。

1. 有序流中的水位线

理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间 戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推 进。

实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一 条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数 据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般 会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳,如图6-6 所示。所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。

 这里需要注意的是,水位线插入的“周期”,本身也是一个时间概念。在当前事件时间语 义下,假如我们设定了每隔 100ms 生成一次水位线,那就是要等事件时钟推进 100ms 才能插入;但是事件时钟本身的进展,本身就是靠水位线来表示的——现在要插入一个水位线,可前 提又是水位线要向前推进 100ms,这就陷入了死循环。所以对于水位线的周期性生成周期时间是指处理时间(系统时间),而不是事件时间。

2. 乱序流中的水位线

有序流的处理非常简单,看起来水位线也并没有起到太大的作用。但这种情况只存在于理 想状态下。我们知道在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性, 导致顺序发生改变,这就是所谓的“乱序数据”。

这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产 生时间而言的。如图 6-7 所示,一个 7 秒时产生的数据,生成时间自然要比 9 秒的数据早;但 是经过数据缓存和传输之后处理任务可能先收到了 9 秒的数据,之后 7 秒的数据才姗姗来迟。 这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?

最直观的想法自然是跟之前一样,我们还是靠数据来驱动,每来一个数据就提取它的时间 戳、插入一个水位线。不过现在的情况是数据乱序,所以有可能新的时间戳比之前的还小,如 果直接将这个时间的水位线再插入,我们的“时钟”就回退了——水位线就代表了时钟,时光 不能倒流,所以水位线的时间戳也不能减小。

解决思路也很简单:

我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则 就不再生成新的水位线,如图 6-8 所示。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

 

如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线

这样做尽管可以定义出一个事件时钟,却也会带来一个非常大的问题:我们无法正确处理 “迟到”的数据。在上面的例子中,当 9 秒产生的数据到来之后,我们就直接将时钟推进到了9 秒;如果有一个窗口结束时间就是 9 秒(比如,要统计 0~9 秒的所有数据),那么这时窗口就应该关闭、将收集到的所有数据计算输出结果了。但事实上,由于数据是乱序的,还可能有 时间戳为 7 秒、8 秒的数据在 9 秒的数据之后才到来,这就是“迟到数据”(late data)。它们 本来也应该属于 0~9 秒这个窗口,但此时窗口已经关闭,于是这些数据就被遗漏了,这会导致统计结果不正确。

为了让窗口能够正确收集到迟到的数据,我们也可以等上 2 秒;也就是 用当前已有数据的最大时间戳减去 2 秒,就是要插入的水位线的时间戳,如图 6-10 所示。这 样的话,9 秒的数据到来之后,事件时钟不会直接推进到 9 秒,而是进展到了 7 秒;必须等到11 秒的数据到来之后,事件时钟才会进展到 9 秒,这时迟到数据也都已收集齐,0~9 秒的窗 口就可以正确计算结果了。

为了防止“迟到数据”还没有传输过来,我们可以选择再多等几秒,以计算出正确的答案。

 3. 水位线的特性

我们可以总结一下水位线的特性:

⚫ 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据

⚫ 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展

⚫ 水位线是基于数据的时间戳生成的

⚫ 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进

⚫ 水位线可以通过设置延迟,来保证正确处理乱序数据

⚫ 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之 前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对 乱序数据的正确处理。

6.2.3 如何生成水位线

1. 生成水位线的总体原则

我们知道,完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即,我们只能 尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据,我们该 怎么做呢?

可以单独创建一个 Flink 作业来监控事件流,建立概率分布或者机器学习模型,学习事件的迟到规律。得到分布规律之后,就可以选择置信区间来确定延迟,作为水位线的生成策略了。例如,如果得到数据的迟到时间服从μ=1,σ=1 的正态分布,那么设置水位线延迟为 3 秒,就可以保证至少 97.7%的数据可以正确处理。

如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。

如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下, 可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。对于这 些 “漏网之鱼”,Flink 另外提供了窗口处理迟到数据的方法,我们会在后面介绍。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。

2. 水位线生成策略(Watermark Strategies)

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线 来指示事件时间:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( 
 WatermarkStrategy<T> watermarkStrategy)

具体使用时,直接用 DataStream 调用该方法即可,与普通的 transform 方法完全一样。

DataStream<Event> stream = env.addSource(new ClickSource()); 
DataStream<Event> withTimestampsAndWatermarks = 
stream.assignTimestampsAndWatermarks(<watermark strategy>); 

这里需要将原始的时间戳提取出来,如果不提取出来并明确把它分配给数据,Flink是无法知道数据真正产生的时间的。当然,有些时候数据源本身就提供了时间戳信息, 比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳,而不需要单独提取字段分配 了。

.assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就 是 所 谓 的 “ 水 位 线 生 成 策 略 ” 。 WatermarkStrategy 中 包 含 了 一 个 “ 时间戳分配器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator。

⚫ TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给 元素。时间戳的分配是生成水位线的基础。

⚫ WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在

WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。

⚫ onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳, 以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作

⚫ onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间 为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。

env.getConfig().setAutoWatermarkInterval(60 * 1000L); 

3. Flink 内置水位线生成器

WatermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的需求;但看起来有些复杂,如果想要自己实现应该还是比较麻烦的。好在 Flink 充分考虑到了 我们的痛苦,提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程, 而且也为我们自定义水位线策略提供了模板。

这两个生成器可以通过调用 WatermarkStrategy 的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。

(1)有序流

对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以 永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。


                有序:
                调用.withTimestampAssigner()方法,将数据中的 timestamp 字段提取出来,
                作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。
                这样,提取出的数据时间戳,就是我们处理计算的事件时间。 
                这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。
                 
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() 
                            @Override
                            public long extractTimestamp(Event event, long l) 
                                return event.timestamp;
                            
                        )
                )
               

(2)乱序流

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参 数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序 程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

//乱序流的WaterMark生成

                //1.WaterMark的生成器
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() 
                            @Override
                            //2.时间戳的提取器
                            public long extractTimestamp(Event event, long l) 
                                return event.timestamp;
                            
                        )
                );

上面代码中,我们同样提取了 timestamp 字段作为时间戳,并且以 5 秒的延迟时间创建了 处理乱序流的水位线生成器。事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为 0 的乱序流水 位线生成器,两者完全等同:

WatermarkStrategy.forMonotonousTimestamps() 
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)) 

4. 自定义水位线策略

一般来说,Flink 内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑 可能非常复杂,这时对水位线生成的逻辑也有更高的要求,我们就必须自定义实现水位线策略WatermarkStrategy 了。

在 WatermarkStrategy 中,时间戳分配器 TimestampAssigner 都是大同小异的,指定字段提 取时间戳就可以了;而不同策略的关键就在于 WatermarkGenerator 的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水 位线。

下面是一段自定义周期性生成水位线的代码:

import org.apache.flink.api.common.eventtime.*; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
 
// 自定义水位线的产生 
public class CustomWatermarkTest  
 public static void main(String[] args) throws Exception  
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
 env.setParallelism(1); 
 
 env 
 .addSource(new ClickSource()) 
 .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) 
 .print(); 
 
 env.execute(); 
  
 
 public static class CustomWatermarkStrategy implements WatermarkStrategy<Event>  
 @Override 
 public TimestampAssigner<Event> 
createTimestampAssigner(TimestampAssignerSupplier.Context context)  
 return new SerializableTimestampAssigner<Event>()  
 @Override 
136 
 

 public long extractTimestamp(Event element, long recordTimestamp) 
 
 return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段 
  
 ; 
  
 
 @Override 
 public WatermarkGenerator<Event> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context)  
 return new CustomPeriodicGenerator(); 
  
  
 
 public static class CustomPeriodicGenerator implements 
WatermarkGenerator<Event>  
 private Long delayTime = 5000L; // 延迟时间 
 private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳 
 
 @Override 
 public void onEvent(Event event, long eventTimestamp, WatermarkOutput 
output)  
 // 每来一条数据就调用一次 
 maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳 
  
 
 @Override 
 public void onPeriodicEmit(WatermarkOutput output)  
 // 发射水位线,默认 200ms 调用一次 
 output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); 
  
  
 

我们在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;这个方法 由系统框架周期性地调用,默认 200ms 一次。所以水位线的时间戳是依赖当前已有数据的最 大时间戳的(这里的实现与内置生成器类似,也是减去延迟时间再减 1),但具体什么时候生 成与数据无关。

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时, 就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。

自定义的断点式水位线生成器代码如下:

public class CustomPunctuatedGenerator implements WatermarkGenerator<Event>  

 @Override 
 public void onEvent(Event r, long eventTimestamp, WatermarkOutput output)  
// 只有在遇到特定的 itemId 时,才发出水位线 
 if (r.user.equals("Mary"))  
 output.emitWatermark(new Watermark(r.timestamp - 1)); 
  
  
 
 @Override 
 public void onPeriodicEmit(WatermarkOutput output)  
 // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线 
  
 

我们在 onEvent()中判断当前事件的 user 字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一 定在某个数据到来之后。

5. 在自定义数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自 定义数据源中发送了水位线以后,就不能再在程序中使用 assignTimestampsAndWatermarks 方 法 来 生 成 水 位 线 了 。 在 自 定 义 数 据 源 中 生 成 水 位 线 和 在 程 序 中 使 用

assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.source.SourceFunction; 
import org.apache.flink.streaming.api.watermark.Watermark; 
 
 
import java.util.Calendar; 
import java.util.Random; 
 
public class EmitWatermarkInSourceFunction  
 public static void main(String[] args) throws Exception  
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
 env.setParallelism(1); 
 
 env.addSource(new ClickSourceWithWatermark()).print(); 
 
 env.execute(); 
  
 
 // 泛型是数据源中的类型 
 public static class ClickSourceWithWatermark implements SourceFunction<Event> 
 
 private boolean running = true; 
 @Override 
 public void run(SourceContext<Event> sourceContext) throws Exception  
 Random random = new Random(); 
 String[] userArr = "Mary", "Bob", "Alice"; 
 String[] urlArr = "./home", "./cart", "./prod?id=1"; 
 while (running)  
 long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时
间戳 
 String username = userArr[random.nextInt(userArr.length)]; 
 String url = urlArr[random.nextInt(urlArr.length)]; 
 Event event = new Event(username, url, currTs); 
 // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段 
 sourceContext.collectWithTimestamp(event, event.timestamp); 
 // 发送水位线 
 sourceContext.emitWatermark(new Watermark(event.timestamp - 1L)); 
 Thread.sleep(1000L); 
  
  
 
 @Override 
 public void cancel()  
 running = false; 
  
  
 
 

6.2.4 水位线的传递

(1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”

(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最 小的那个。

(2)当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分 区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当 前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区 水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任 务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给 下游所有子任务。

(3)再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区 时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,140也不会向下游任务发出水位线。

(4)同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间 戳为 4 的水位线,广播到下游各个分区任务。

水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题, 每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果 总是正确的。对于有多条流合并之后进行处理的场景,水位线传递的规则是类似的。

 6.2.5 水位线的总结

水位线在事件时间的世界里面,承担了时钟的角色。也就是说在事件时间的流中,水位线 是唯一的时间尺度。如果想要知道现在几点,就要看水位线的大小。后面讲到的窗口的闭合, 以及定时器的触发都要通过判断水位线的大小来决定是否触发。

水位线是一种特殊的事件,由程序员通过编程插入的数据流里面,然后跟随数据流向下游 流动。

水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒。

数据流开始之前Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE) 的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保 证所有的窗口闭合以及所有的定时器都被触发。

对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种 情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水位线在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算 的正确,无需在数据流的中间插入水位线了。

flink学习(十四)flink窗口时间和水位线(代码片段)

...nk支持如下3种:滚动窗口,窗口数据有固定的大小,窗口中的数据不会叠加;滑动窗口,窗口数据有固定的大小,并且有生成间隔;会话窗口,窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加。 &nb... 查看详情

flink中的时间和窗口(代码片段)

借鉴《尚硅谷Flink1.13版本笔记.pdf》中第六章Flink中的时间和窗口在流数据处理应用中,一个很重要的操作就是窗口计算。所谓的“窗口”,就是划定的一段时间范围,也就是“时间窗”;对这范围内数据进行处理&... 查看详情

「flink」flink中的时间类型(代码片段)

Flink中的时间类型和窗口是非常重要概念,是学习Flink必须要掌握的两个知识点。Flink中的时间类型时间类型介绍Flink流式处理中支持不同类型的时间。分为以下几种:处理时间Flink程序执行对应操作的系统时间。所有基于时间的... 查看详情

flink总结之一文彻底搞懂时间和窗口(代码片段)

...(Watermark)1.什么是水位线2.水位线分类1.有序流中的水位线1、如何使用2.乱序流中的水位线2.1乱序流中如何保证数据的准确性2.2如何使用三、窗口1.什么是窗口2.窗口分类1.按照驱动类型分1.计数窗口(CountWindow)2.... 查看详情

flink窗口window(代码片段)

...时间分类  EventTime:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。  IngestionTime:是数据进入Flink的时间。  Proc... 查看详情

flink原理学习窗口和时间(代码片段)

...生的时间,一旦确定就不会发生变化。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。ProcessingTime:即处理时间,是... 查看详情

flink原理学习窗口和时间(代码片段)

...生的时间,一旦确定就不会发生变化。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。ProcessingTime:即处理时间,是... 查看详情

flink窗口window机制详解(代码片段)

...乱序处理,和checkpoint机制)。本文我们将介绍流式处理中的窗口概念,介绍Flink内建的一些窗口和Windo 查看详情

flink和sentinel中计算当前时间所属的时间窗口算法(代码片段)

flink中:org.apache.flink.streaming.api.windowing.windows.TimeWindowpublicstaticlonggetWindowStartWithOffset(longtimestamp,longoffset,longwindowSize)//转载请标明链接:https://blog.csdn.net/wabiaoz 查看详情

flink笔记9[实验]体验窗口开启时间和关闭时间(eventtime)(代码片段)

体验窗口开启时间和关闭时间 实验数据实验代码实验结果实验分析窗口开始时间公式实验数据sensor_1,1619492107,36.2sensor_1,1619492108,36.0sensor_1,1619492109,36.5sensor_1,1619492110,34.3sensor_1,1619492111,34.3sensor_1,1619492112,34.3sensor_1,16194 查看详情

12-flink-1.10.1-flink中的时间语义和watermark(代码片段)

目录1Fink中的时间语义1.1哪种时间语义更重要2设置EventTime3水位线(Watermark)3.1水位线概念3.2watermark原理和特点4watermark的传递,引入和设置4.1 watermark的传递 4.2watermark代码中引入 4.3自定义watermark4.4 watermark的实例代... 查看详情

flink中window窗口和时间以及watermark水印(代码片段)

...计,不是按照时间,比如countWindow(100)表示当窗口中的数据有100个的时候开始计算SlidingCountWindow:累积固定个数的数据就作为一个窗口,超过指定数量个数数据开启新一个窗口计算,比如coutnWindow(100,10)窗口大小是100&... 查看详情

flink窗口和水位线(代码片段)

...的话,水位线还有一个功能就是可以控制窗口的关闭时间窗口分类时间窗口: 查看详情

flink实现自定义滑动窗口(代码片段)

...们大部分的场景,但是有时候我们需要计算一个固定时间范围内的数据,比如实时计算每天凌晨到第二天凌晨的数据,或者每天上午7点到第二天上午7点。类似于这种情况Flink默认提供的窗口是不支持的,因为Flink... 查看详情

flink流计算随笔(代码片段)

...)在流上的工作方式与批处理不同。例如,不可能计算流中的所有元素,因为流通常是无限的(×××的)。相反,流上的聚合(计数、和等)是由窗口windows限定作用域的,例如“过去5分钟的计数”或“最后100个元素的总和”。Windows可... 查看详情

11.flink四大基石window窗口的分类flink提供了很多各种场景用的windowassigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动(代码片段)

...nk提供了很多各种场景用的WindowAssigner12.3.代码演示–基于时间的滚动和滑动–掌握12.3.1.本地ubuntu下安装nc12.3.2.基于时间的滚动和滑动窗口12.4.代码演示-基于数量的滚动和滑动12.5.代码演示–Session会话窗口11.Flink四大基石Checkpoint... 查看详情

flink窗口函数处理数据(watermark和sideoutput)(代码片段)

...重要的解决思路,1.添加水位线Watermark2.推迟关闭窗口时间3.超时数据的side输出下面的例子是 查看详情

12-flink-1.10.1-flink中的时间语义和watermark(代码片段)

目录1Fink中的时间语义1.1哪种时间语义更重要2设置EventTime3水位线(Watermark)3.1水位线概念3.2watermark原理和特点4watermark的传递,引入和设置4.1 watermark的传递 4.2watermark代码中引入 4.3自定义watermark4.4 watermark的实例代... 查看详情