flink基于flinkcep实时计算商品订单流失量(代码片段)

九师兄 九师兄     2022-12-05     121

关键词:

在这里插入图片描述

1.概述

转载:https://blog.csdn.net/tzs_1041218129/article/details/108786597

假设有个需求需要实时计算商品的订单流失量,规则如下:

用户点击商品 A,但购买了同类商品 B,则商品 A 记为一次订单流失量;

点击商品 A 到购买同类商品 B 的有效时间窗口应该小于 12 个小时;

有效窗口内多次点击商品 A 视为一次订单流失。

第三条规则可以理解为数据流去重,我在上一节已经介绍过了。为了更加专注于计算商品的订单流失量,本篇文章不再关注数据去重。

看到这个需求,想到可以用上一节的 ProcessFunction 进行状态管理,比如说基于用户进行分流,然后每个用户维护一个状态和一个有效时间窗口,触发购买同类事件后进行数据统计,过了有效期后舍弃。

但是,有没有更优雅的一点方式呢?

答案是有的,我们可以使用 Flink 自带的 CEP 来实现。

下面先简单介绍下 FlinkCEP,然后给出代码实践。

1.FlinkCEP
1.1 什么是 CEP
CEP 全称为 Complex Event Process,是在 Flink 之上实现的复杂事件处理(CEP)库。它允许你在无界的事件流中检测事件模式,让你有机会掌握数据中重要的事项。

例如:“起床–>洗漱–>吃饭–>上班”这一系列串联起来的事件流形成的模式称为 CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了。

再举几个经典例子:

异常检测:打车计费后 12 小时还未结束订单;用户短时间内连续完成多个订单;

实时营销:用户在不同平台进行比价;

数据监控:检测某些指标,比如订单流失量。

1.2 FlinkCEP 原理
FlinkCEP 内部是用 「NFA(非确定有限自动机)「来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为」起始状态」、「中间状态」、「最终状态」三种,边分为 「take」、「ignore」、「proceed」 三种。

「take」:必须存在一个条件判断,当到来的消息满足 take 边条件判断时,把这个消息放入结果集,将状态转移到下一状态。

「ignore」:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。

「proceed」:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。举个例子,当用户购买商品时,如果购买前有一个咨询客服的行为,需要把咨询客服行为和购买行为两个消息一起放到结果集中向下游输出;如果购买前没有咨询客服的行为,只需把购买行为放到结果集中向下游输出就可以了。也就是说,如果有咨询客服的行为,就存在咨询客服状态的上的消息保存,如果没有咨询客服的行为,就不存在咨询客服状态的上的消息保存,咨询客服状态是由一条 proceed 边和下游的购买状态相连。

当然,在我们的场景中不会涉及太多复杂的概念。

2.FlinkCEP 简单上手
本节内容引用参考 1,用于完成基本的概念讲解和 Demo 实现。

2.1 单个 Pattern
我们先从简单的内容入手。看看在单个Pattern下,Flink CEP是如何匹配的。

2.1.1 各个API的用法
在学习 Flink CEP 的过程中,很容易找到相似的博文,文章中使用表格列举出了各个 API 的作用。然而大家很容易发现,这东西太像正则表达式了(实际上底层匹配逻辑的实现方式应该也和正则表达式类似)。因此,结合正则表达式理解这些 API 显得十分快速,所以我自作主张,加上了功能相近的正则表达式。例如,我们要用 CEP 匹配字母 x:
在这里插入图片描述

2.1.2 仅使用 where 和 or 写一个程序
比如说,我们现在有一个简单的需求,对于输入的数据流中,匹配所有以 x 或 y 开头的数据:

public class CepDemo 
    public static void main(String[] args) throws Exception 
        var environment = StreamExecutionEnvironment.getExecutionEnvironment();
        var stream = environment.setParallelism(1).addSource(new ReadLineSource("Data.txt"));
 
        // 使用 where 和 or 来定义两个需求;
        // 当然也可以放在一个 where 里。
        var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() 
            @Override
            public boolean filter(String s, Context<String> context) 
                return s.startsWith("x");
            
        ).or(new IterativeCondition<>() 
            @Override
            public boolean filter(String s, Context<String> context) throws Exception 
                return s.startsWith("y");
            
        );
 
        // CEP.pattern 的第一个参数是数据流,第二个是规则;
        // 然后利用 select 方法抽取出匹配到的数据。
        // 这里用了 lambda 表达式
        CEP.pattern(stream, pattern).select((map ->
                Arrays.toString(map.get("start").toArray()))
        ).addSink(new SinkFunction<>() 
            @Override
            public void invoke(String value, Context context) 
                System.out.println(value);
            
        );
        environment.execute();
    

对于输入的数据流:

x1
z2
c3
y4

我们有输出:

读取:x1   
[x1]   
读取:z2   
读取:c3   
读取:y4   
[y4]

可以看到,Flink CEP 可以根据输入的每一条数据进行匹配。单条数据可以是本文中的字符串,也可以是复杂的事件对象,当然也可以是字符。如果每一条数据都是一个字符,那 CEP 就和正则表达式十分相似了。

2.1.3 加上量词
接下来,还是在单个 Pattern 中,我们加上量词 API,研究研究 Flink CEP 是如何匹配多条数据的。从这里开始,事情和正则表达式有了一些差距。差距主要在结果的数量上。由于是流计算,因此在实际处理过程中,Flink 无法知道后续的数据,所以会输出所有匹配的结果。

例如,使用 timesOrMore() 函数,匹配以 a 开头的字符串出现 3 次及以上的情况,首先编写代码(其他代码与上方的例子完全一致,为节约篇幅不再列出,下同):

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() 
    @Override
    public boolean filter(String s, Context<String> context) 
        return s.startsWith("a");
    
).timesOrMore(3);

随后在Data.txt中输入如下字符串序列:

a1
a2
a3
b1
a4

运行程序,输出如下结果:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
[a1, a2, a3, a4]
[a2, a3, a4]

下面分析一下执行流程。程序开始后,等待数据流入。当a1和a2输入后,由于暂时不满足条件,所以没有产生结果,只是将数据储存在状态中。a3到来后,第一次满足了匹配条件,因此程序输出结果 [a1, a2, a3]。随后,b1输入,不满足条件;接下来a4输入。此时,a1、a2和a3依旧储存在状态中,因此依然可以参与匹配。匹配可以产生多个结果,但是有两个原则:

必须严格按照数据流入的顺序;

产生的结果必须包含当前元素;

原则 1 很好理解,由于数据的流入是按照 a1 -> a2 -> a3 -> a4 的顺序,所以结果生成的序列也必须按照这个顺序,不能删减中间数据,更不能打乱顺序。因此, [a1, a2, a4] 和 [a3, a2, a4, a1] 这种结果是不可能生成的。原则 2 就更好理解了,数据是因为 a4 的流入才产生的,再考虑到我们设定的量词条件是“三个及以上”,因此产生的结果只可能是 [a2, a3, a4] 和 [a1, a2, a3, a4]。

同理,如果我们在 Data.txt 最后加入一行 a5,则程序输出结果如下:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
[a1, a2, a3, a4]
[a2, a3, a4]
读取:a5
[a1, a2, a3, a4, a5]
[a2, a3, a4, a5]
[a3, a4, a5]

按照这种思路,如果我们继续加上 a6、a7、a8、……、a100,那么每个数据产生的结果会越来越多,因为 Flink CEP 会把所有符合条件的数据储存在状态里。「这样下去不行的,要不然内存养不起它的」。因此,oneOrMore() 和 timesOrMore() 之类的函数后面,一般都要跟上 until() 函数,从而指定终止条件。

2.1.4 把量词换成 times()
如果使用和上面一样的数据,但是把量词换成 times(3),会产生什么样的结果?我们首先修改代码:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() 
    @Override
    public boolean filter(String s, Context<String> context) 
        return s.startsWith("a");
    
).times(3);

由于固定了只匹配三个,再加上前文提到的两个原则的束缚,结果就很明显了:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
[a2, a3, a4]
读取:a5
[a3, a4, a5]

从 a1 到 b1 的逻辑完全相同,当读取到 a4 时,由于只匹配 3 个,同时结果必须包含 a4,因此产生的结果只能是 [a2, a3, a4] 。同理读取到 a5 后,由于结果必须包含 a5 且只匹配 3 个,所以结果只能是 [a3, a4, a5] 。这种情况下,过期的数据会被清理掉,妈妈再也不用担心我的内存不够用了。

除了固定参数,times() 函数还支持 times(from, to) 指定边界。这种情况下的匹配结果和上文类似,相信大家很容易就能推出来,在此我就不再赘述了。

2.1.5 使用严格模式
大家也许注意到,上文的 Data.txt 中,一直有一个讨厌的 b1。由于不满足我们的基本匹配条件,b1 直接被我们的程序忽略掉了。这是因为 Flink CEP 默认采用了不严格的匹配模式,而在某些情况下,这种数据是不能忽略的,这时候就可以使用 consecutive() 函数,指定严格的匹配模式。修改代码如下:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() 
    @Override
    public boolean filter(String s, Context<String> context) 
        return s.startsWith("a");
    
).times(3).consecutive();

运行程序,产生如下结果:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
读取:a5

此时,由于 a1、a2、a3 是紧密相连的,因此被成功匹配。而 a2、a3、a4 和 a3、a4、a5 中间由于多了一个 b1,在严格模式下不能被匹配。可以看出,严格模式下的匹配策略更像正则表达式。

2.2 多个 Pattern
一般而言,需要使用 CEP 的任务都得依靠多个 Pattern 才能解决。此时,可以使用 followedBy()、next() 等函数创建一个新的 Pattern,并按照不同的逻辑将新 Pattern 和前一个 Pattern 连接起来。

2.2.1 使用 followedBy() 创建一个新的 Pattern
我们再来看一下如何处理多个 Pattern,比如说我们需要匹配“包含 2-3 个 a 开头的字符串,同时包含 1-2 个 b 开头的字符串”的输入数据。

// 我们用 times(2,3) 来控制匹配 2-3 次;
// followBy 用于控制两个具有顺序的关系的 Pattern。
var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() 
    @Override
    public boolean filter(String s, Context<String> context) 
        return s.startsWith("a");
    
).times(2, 3).followedBy("middle").where(new IterativeCondition<String>() 
    @Override
    public boolean filter(String s, Context<String> context) throws Exception 
        return s.startsWith("b");
    
).times(1, 2);
 
CEP.pattern(stream, pattern).select(map -> 
    // 把匹配的结果装进 list 中。
    var list = map.get("start");
    list.addAll(map.get("middle"));
    return Arrays.toString(list.toArray());
).addSink(new SinkFunction<>() 
    @Override
    public void invoke(String value, Context context) 
        System.out.println(value);
    
);
 

这里我们使用了 followedBy () 函数,该函数创建了一个名为 “middle” 的新 Pattern,新 Pattern 中包含了指向原 Pattern 的引用。同样发生变化的是 select 函数中的 lambda 表达式。在表达式中,我们除了获取名为 “start” 的 Pattern 中的数据,还获取了名为 “middle” 的 Pattern 的数据,并将他们拼在一起。这与正则表达式中的子表达式特别类似,实际上,我们可以将每个 Pattern 近似看作一个子表达式,在读取结果的时候,使用 Pattern 的名字,从 map 中提取出结果。

数据的输入为:

a1
a2
a3
b1
a4
a5
b2

数据输出为:

读取:a1
读取:a2
读取:a3
读取:b1
[a1, a2, a3, b1]
[a1, a2, b1]
[a2, a3, b1]
读取:a4
读取:a5
读取:b2
[a1, a2, a3, b1, b2]
[a1, a2, b1, b2]
[a2, a3, a4, b2]
[a2, a3, b1, b2]
[a3, a4, a5, b2]
[a3, a4, b2]
[a4, a5, b2]

一下子产生了这么多数据,我一开始还是很懵的。接下来我们逐步分析下:

a1, a2 依次读入,不满足整体条件,但是满足 “start” 条件,且产生了 [a1, a2] 这一中间结果,存在状态中;

a3 读入,不满足整体条件,但是满足 “start” 条件,且产生了 [a2, a3] 和 [a1, a2, a3] 两个结果;

b1 读入,满足 “middle” 条件,产生 [b1] 中间结果。此时整体条件满足,因此和上述中间结果组合输出 [a1, a2, a3, b1] 、 [a1, a2, b1] 和 [a2, a3, b1] ;

a4 读入,继续满足 “start” 条件,产生 [a2, a3, a4] 和 [a3, a4];两个结果,但是由于这两个结果是在 b1 读入之后产生的,因此这两个结果不能和 [b1] 进行组合;

a5 读入,继续满足 “start” 条件,产生 [a3, a4, a5] 和 [a4, a5] 两个中间结果,同理不能和 [b1] 进行组合;

b2 读入,继续满足 “middle” 条件,产生 [b1, b2] 和 [b2] 两个中间结果。这里开始比较复杂了,需要严格结合时间顺序来分析。由于 b1 是在 a4 之前读入的,因此包含 b1 的序列 [b1, b2] 只能与 [a1, a2] 、 [a2, a3] 和 [a1, a2, a3] 进行关联。而 [b2] 则可以与包含了 a4 或 a5 的 [a2, a3, a4] 、 [a3, a4]、 [a3, a4, a5] 和 [a4, a5] 四个序列关联,因此此时输出结果如下:

[a1, a2, a3, b1, b2]    // [a1, a2, a3] 和 [b1, b2] 关联
[a1, a2, b1, b2]        // [a1, a2] 和 [b1, b2] 关联
[a2, a3, a4, b2]        // [a2, a3, a4] 和 [b2] 关联
[a2, a3, b1, b2]        // [a2, a3] 和 [b1, b2] 关联
[a3, a4, a5, b2]        // [a3, a4, a5] 和 [b2] 关联
[a3, a4, b2]            // [a3, a4] 和 [b2] 关联
[a4, a5, b2]            // [a4, a5] 和 [b2] 关联

那么有一个问题,为什么 [b2] 不能与 [a1, a2] 、 [a2, a3] 和 [a1, a2, a3] 进行关联呢?还是要站在时间序列的角度进行解释。因为只有 b1 是跟随在这三个元素后面的,所以只有包含 b1 的两个序列([b1] 和 [b1, b2])可以和它们进行关联,这就是 followedBy 的含义。为了验证这一观点,我们在 Data.txt 最后加上一个 b3,在其他代码均不变的情况下,最后读入 b3 后,输出如下结果:

[a2, a3, a4, b2, b3]
[a3, a4, a5, b2, b3]
[a3, a4, b2, b3]
[a4, a5, b2, b3]

分析如下:当读入 b3 后,满足 “middle” 条件,生成 [b2, b3] 和 [b3]。其中,只有 [b2, b3] 包含了 b2,由于 b2 是距离 [a2, a3, a4] 、 [a3, a4]、 [a3, a4, a5] 和 [a4, a5] 四个序列最近的数据,因此只有 [b2, b3] 才能和上述四个序列关联。而 [b3] 由于不包含 b2,因此无法和它们关联。

2.2.2 将 followedBy() 换成 next()
可以将 next () 看作是加强版的 followedBy ()。在 followedBy 中,两个 Pattern 直接允许不紧密连接,例如上文中的 [a1, a2] 和 [b1] ,他们中间隔了一个 a3. 这种数据在 next () 中会被丢弃掉。使用上文同样的数据(不包括 b3),将代码中的 followedBy 换成 next,修改如下:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() 
    @Override
    public boolean filter(String s, Context<String> context) 
        return s.startsWith("a");
    
).times(2, 3).next("middle").where(new IterativeCondition<String>() 
    @Override
    public boolean filter(String s, Context<String> context) throws Exception 
        return s.startsWith("b");
    
).times(1, 2);

运行后,看到如下结果:

读取:a1
读取:a2
读取:a3
读取:b1
[a1, a2, a3, b1]
[a2, a3, b1]
读取:a4
读取:a5
读取:b2
[a1, a2, a3, b1, b2]
[a2, a3, b1, b2]
[a3, a4, a5, b2]
[a4, a5, b2]

和之前的结果进行分析,发现结果中的 [a1, a2, b1] 、 [a1, a2, b1, b2]、 [a2, a3, a4, b2] 和 [a3, a4, b2] 均被排除,因为他们相比原序列,分别缺少了 a3、a3、a5、a5。

2.2.3 greedy() 做了什么
关于 greedy () 的用法,可以说是十分令人迷惑的。我看了许多文章,对 greedy () 的描述几乎都是一笔带过。描述大多是 “尽可能多的匹配”,但是实际上,大多数情况下加不加 greedy () 几乎没有任何区别。「因为 greedy () 虽然被归为量词 API,但是它实际上是在多个 Pattern 中才能起作用的。」 为此,我找到了 greedy () 的实现逻辑,在 NFACompiler 类的 updateWithGreedyCondition 方法中,代码如下:

private void updateWithGreedyCondition(
 State<T> state,
 IterativeCondition<T> takeCondition) 
 for (StateTransition<T> stateTransition : state.getStateTransitions()) 
  stateTransition.setCondition(
   new RichAndCondition<>(stateTransition.getCondition(), 
   new RichNotCondition<>(takeCondition)));
 

阅读代码,发现该方法实际上添加了一个逻辑:「确认当前条件满足转换到下一个 state 所需的条件,且不满足当前 state 的条件」。意思就是,如果当前处于 Pattern1,但是出现了一条同时满足两个 Pattern1 和 Pattern2 条件的数据,在不加 greedy () 的情况下,会跳转到 Pattern2,但是如果加了 greedy (),则会留在 Pattern1。下面我们来验证一下,编写如下代码:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() 
    @Override
    public boolean filter(String s, Context<String> context) 
        return s.startsWith("a");
    
).times(2, 3).next("middle").where(new IterativeCondition<String>() 
    @Override
    public boolean filter(String s, Context<String> context) throws Exception 
        return s.length() == 3;
    
).times(1, 2);

在这一代码中,如果一条数据为a开头,且长度为3,则同时满足“start”和“middle”。同时,为了方便区分数据到底属于哪个Pattern,我们在输出前加入分隔符:

CEP.pattern(stream, pattern).select(map -> 
    var list = map.get("start");
    list.add("|");
    list.addAll(map.get("middle"));
    return Arrays.toString(list.toArray());
).addSink(new SinkFunction<>() 
    @Override
    public void invoke(String value, Context context) 
        System.out.println(value);
    
);

准备如下数据

作业帮基于flink的实时计算平台实践

更多Flink相关技术问题,可扫码加入社区钉钉交流群~   戳我,查看原文视频~ 查看详情

flink模拟项目:实时热门商品统计(代码片段)

首先要实现的是实时热门商品统计,我们将会基于UserBehavior数据集来进行分析。项目主体用Scala编写,采用IDEA作为开发环境进行项目编写,采用maven作为项目构建和管理工具。首先我们需要搭建项目框架。2.1创建Maven项目2.1.1项目... 查看详情

基于实时计算flink版的场景解决方案demo

...1a;https://developer.aliyun.com/learning/course/839本文主要分享两个基于Flink制作的实时大数据的应用。为了更好的体现应用的价值以及它所代表的典型的场景,这次的分享定制了两个接近现实生活中的 查看详情

快手基于flink构建实时数仓场景化实践

简介: 一文了解快手基于Flink构建的实时数仓架构,以及一些难题的解决方案。本文整理自快手数据技术专家李天朔在5月22日北京站FlinkMeetup分享的议题《快手基于Flink构建实时数仓场景化实践》,内容包括:快... 查看详情

基于spark和flink的电商数据分析项目(代码片段)

目录业务需求业务数据源用户访问Session分析Session聚合统计Session分层抽样Top10热门品类Top10活跃Session页面单跳转化率分析各区域热门商品统计分析广告点击流量实时统计分析总体流程实时黑名单广告点击实时统计统计每天各省top3... 查看详情

袋鼠云:基于flink构建实时计算平台的总体架构和关键技术点

...github和gitee上有一个有趣的开源项目:FlinkX,FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,也可以采集实时变化的数据,是全域、异构、批流一体的数据同步引擎。大家喜欢的话请给我们点个star!star... 查看详情

基于kafka的实时计算引擎:flink能否替代spark?

根据IBM的统计报告显示,过去两年内,当今世界上90%的数据产生源于新设备、传感器以及技术的出现,数据增长率也会为此加速。而从技术上将,这意味着大数据领域,处理这些数据将变得更加复杂和具有挑战性。例如移动应用... 查看详情

快手基于flink构建实时数仓场景化实践

...术专家李天朔在5月22日北京站FlinkMeetup分享的议题《快手基于Flink构建实时数仓场景化实践》,内容包括:快手实时计算场景快手实时数仓架构及保障措施快手场景问题及解决方案未来规划1.快手实时计算场景快手业务中... 查看详情

指标统计:基于流计算oceanus(flink)实现实时uvpv统计

作者:吴云涛,腾讯CSIG高级工程师导语|最近梳理了一下如何用Flink来实现实时的UV、PV指标的统计,并和公司内微视部门的同事交流。然后针对该场景做了简化,并发现使用FlinkSQL来实现这些指标的统计会更加便捷... 查看详情

实时监控:基于流计算oceanus(flink)实现系统和应用级实时监控(代码片段)

作者:吴云涛,腾讯 CSIG 高级工程师本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其App应用的CPU和内存等资源消耗数据,以短信、电话... 查看详情

aliexpress基于flink的广告实时数仓建设

...供低延时数据指标为目的供业务实时决策,本文主要介绍基于Flink的广告实时数仓建设,主要包括以下内容:1.建设背景2.技术架构3.数仓架构4. 实时OLAP5.实时保障6.未来规划建设背景广告是目前互联网流量变现的一种重要手段... 查看详情

flinkcep-flink的复杂事件处理(代码片段)

版本说明本文中以Flink1.16.1版本讲解说明Note:Flink1.16.1版本相较于之前版本增强的within函数,支持模式序列中相邻事件间的超时定义,以前版本只支持模式序列中第一个事件到最后一个事件之间的最大时间间隔。快速开始... 查看详情

Flink CEP 事件未触发

】FlinkCEP事件未触发【英文标题】:FlinkCEPEventNottriggering【发布时间】:2020-07-0607:01:59【问题描述】:我已经在Flink中实现了CEP模式,它按预期工作,连接到本地Kafka代理。但是当我连接到基于集群的云kafka设置时,FlinkCEP没有触发... 查看详情

基于flink构建实时数仓实践

...仓的建设变得越发重要起来。本文主要介绍用户增长业务基于Flink构建实时数仓的实践之路。实时数仓1.0介绍如下图是早期的实时计算架构,实时数据需求较 查看详情

快手基于flink构建实时数仓场景化实践

摘要:今天主要分享的内容是Flink在快手的实践和应用,分享日期:2021年5月22日。内容包括:快手实时计算场景快手实时数仓架构及保障措施快手场景问题及解决方案未来规划Tips:点击文末「阅读原文」即可回顾... 查看详情

实时数仓系列-网易云音乐基于flink+kafka的实时数仓建设实践

简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍Flink+Kafka在网易云音乐的应用实战:背景Flink+Kafka平台化设计Kafka在实时数仓中的应用问题&改进直播回放:https://developer.aliyun.com/li... 查看详情

美团基于flink的实时数仓平台建设新进展

传送门:Flink系统性学习笔记1.平台建设现状美团于2018年首次引入Flink实时计算引擎,当时的实时数仓概念还不太普及,平台只提供了FlinkJar任务的生命周期管理和监控报警。2019年,我们注意到实时计算的主要应用... 查看详情

快手基于flink构建实时数仓场景化实践

一、快手实时计算场景快手业务中的实时计算场景主要分为四块:公司级别的核心数据:包括公司经营大盘,实时核心日报,以及移动版数据。相当于团队会有公司的大盘指标,以及各个业务线,比如视频... 查看详情